Strimzi Kafka Operator を使って、KubernetesライクにKafka Connectを試していきます。
ゴール
- Kafka Connect で外部システム(今回はファイル)からメッセージを Kafka Broker に書き込む
構成はこちら。
環境
- Kubernetes 1.21(OCP 4.8)
- Strimzi Operator 0.26
前置き
Kafka Connect とは
Kafka Connectは、Apache Kafkaと外部システム、例えばデータベース、ストレージ、メッセージングシステムとを連携することができるKafkaのエコシステムに含まれるコンポーネントです。
Kafka のクラスタ上で動作することとから、Kafka Brokerと同じくスケーラビリティと信頼性を維持しながら、大量のデータをKafkaのBrokerへ出し入れするためのフレームワークです。
どういう時に使うの?
Kafka Connectでは、さまざまな外部システムと接続するためのプラグインがOSSやベンダーから数多く提供されていて、これらを利用することで開発をスキップして、Kafka に対してメッセージの送受信ができるようになります。
最近よく当てはまる利用例としては、RDBMSのようなステートを管理するシステムをイベントとして扱いたい場合にはもってこいだと思います。 例として、ステートだけを保存しているRDMBSでは、過去の状態を見ることができなかったり(過去の状態はスナップショットで細かく管理しないといけない)、DBがインデックス作成による容量増やパフォーマンス要件によって高コスト化したりと課題が見えてきています。
そこで、ステートではなくイベントとして管理するアプローチをよく見ます。こうしたユーザのアクションやデータの変更をイベントとして管理し、イベントストアに保存する考えをイベントソーシングと呼びます。Kafka Connectは、RDBMSの例とすると、レコードの更新や変更をモニタリングして、イベントとして Kafka Broker に送ることができるので、イベントソーシングのアーキテクチャを構成するのに有力なコンポーネントとなります。
ステートをイベントとして扱うことで、過去に発生した事象はイベントとして保存しておくことができるので閲覧可能(機械学習の予測にも利用できる)、Kafka Connect/Kafka Brokerはスケールアウト型なのでパフォーマンスの要件にも対応しやすく、イベントのデータの保存場所には安価なオブジェクトストレージを利用することでコストを抑えることも手段として取ることができます。また有事の際は記録されたイベントからその”ステート”を重ねて、現在のステートを復元[1]することにも利用できるので、またRDBMSの共有データ利用に対して違ったアプローチをとることができます。
Kafka Connect と Kafka Connector とは
- Kafka Connect と Kafka Connector との関係
Kafka Connectはあくまでプラグインを起動させるためのフレームワークに過ぎない。Kafka Connector は、プラグインによって提供されるものであり、このインスタンスが実際に外部システムとKafka Brokerとやりとりを行いデータを送受信する。
- Kafka Connector の起動方法の違い
Kafka Connectでは、Kafka Connect APIに対して、curlコマンドでPUT/POSTを使って、Kafka Connector の設定を行い、起動をしてメッセージの送受信を行います。 Strimziでも以前は同じようにしていましたが、ある時点からよりKubernetesライクに管理すると言うことで, kafkaconnectorのカスタムリソースが登場しました。これによって、起動方法もかなり楽になりました。
今回のシナリオ
- Kafka Connect の Source パターン(File)
1-1. ターゲットとするファイルの準備
1-2. Topic(my-topic-source)を作成
1-3. Kafka Connect を作成
1-4. Consumer でメッセージ受信を確認
準備
- 便利ツールのインストール
これがあると、コンテキストと名前空間の切り替えが簡単に行えます。
git clone https://github.com/ahmetb/kubectx.gitsudo install ./kubectx/kubectx /usr/local/bin/
sudo install ./kubectx/kubens /usr/local/bin/kubectx -c
kubens -c
- Kafka クラスタの作成
git clone https://github.com/suzukiry/kafka-strimzi.git
cd kafka-strimzi/03-kafka-connect
kubectl apply -f 01_kafka-zk-jbod.yaml
- 作成されたクラスタのPodsの一覧
# kubectl get pods
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-579c7bd9cd-rmmns 3/3 Running 0 3m3s
my-cluster-kafka-0 1/1 Running 0 3m53s
my-cluster-kafka-1 1/1 Running 0 3m53s
my-cluster-kafka-2 1/1 Running 0 3m53s
my-cluster-zookeeper-0 1/1 Running 0 4m53s
my-cluster-zookeeper-1 1/1 Running 0 4m53s
my-cluster-zookeeper-2 1/1 Running 0 4m53s
1. Kafka Connect の Source パターン(ファイル)
1–1. ターゲットとするファイルの準備
よくある例として、Kafka のライセンスのテキストファイル(/opt/kafka/LICENSE)をメッセージとしてTopicに送る例をやります。実際のファイルは、Kafka ConnectのPod/コンテナの中なのでデプロイした後に見ることにする。
1–2. Topic(my-topic-source)を作成
- KafkaTopic[my-file-topic]を作成
kubectl apply -f 02_my-topic-file.yaml
- KafkaTopicの一覧を確認
# kubectl get kafkatopic
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster3 50 3 True
my-file-topic my-cluster3 3 2 True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster3 1 3 True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster3 1 1 True
1–3. Kafka Connect を作成
- KafkaConnect[my-connect-cluster]を作成
kubectl apply -f 03_kafka-connect.yaml
- 結果の確認
# kubectl get kafkaconnect
NAME DESIRED REPLICAS READY
my-connect-cluster 1 True
- 送信する対象ファイルを見ておく
$ kubectl exec -ti my-connect-cluster-connect-6f5484ddff-vsx84 -- sh
sh-4.4$ cat /opt/kafka/LICENSE Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
:
:
:
NOTE:
Kafka ConnectはもともとKafka Brokerのクラスタをベースに動くものでしたので、Kafka ConnectのPodにもKafkaと同じライブラリやファイルなどが存在している。
- Kafka Connector で実行
一応、yamlファイルを見ておくと、Kafka Connectに設定する内容の肝はすべてここに書いてある。specはConnectorとなる.jarとそれに紐づくパラメータ、今回であればファイル名やメッセージとしての送り先のTopic名を追加されている。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-file-connector
labels:
strimzi.io/cluster: my-connect-cluster
namespace: my-kafka-connector
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
config:
file: /opt/kafka/LICENSE
topic: my-file-topic
tasksMax: 2
ということで実行します。
kubectl apply -f 04_kafka-connector.yaml
結果を見ます。
# kubectl get kafkaconnector
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
my-file-connector my-connect-cluster org.apache.kafka.connect.file.FileStreamSourceConnector 2 True
1–4. Consumer でメッセージ受信を確認
- Consumerを実行
kubectl apply -f 05_connector-consumer.yaml
結果を見ます。
# kubectl get pods
NAME READY STATUS RESTARTS AGE
connector-consumer-6d4f5bffdd-xxdgb 1/1 Running 0 73s
my-cluster3-entity-operator-7dd8b87568-x8pw5 3/3 Running 0 43m
my-cluster3-kafka-0 1/1 Running 0 44m
my-cluster3-kafka-1 1/1 Running 0 44m
my-cluster3-kafka-2 1/1 Running 0 44m
my-cluster3-zookeeper-0 1/1 Running 0 45m
my-cluster3-zookeeper-1 1/1 Running 0 45m
my-cluster3-zookeeper-2 1/1 Running 0 45m
my-connect-cluster-connect-6f5484ddff-vsx84 1/1 Running 0 23m
- Pod名を指定してログを見ます
少し見づらいですが、メッセージのvalueにLicenseファイルの文言が含まれているのを確認できます。
# kubectl logs pod/connector-consumer-6d4f5bffdd-xxdgb
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 44
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: the Derivative Works; and
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 45
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value:
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 46
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: (d) If the Work includes a "NOTICE" text file as part of its
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 47
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: distribution, then any Derivative Works that You distribute must
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 48
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: include a readable copy of the attribution notices contained
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 49
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: within such NOTICE file, excluding those notices that do not
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
2022-01-30 07:48:20 INFO KafkaConsumerExample:47 - Received message:
2022-01-30 07:48:20 INFO KafkaConsumerExample:48 - partition: 1
2022-01-30 07:48:20 INFO KafkaConsumerExample:49 - offset: 50
2022-01-30 07:48:20 INFO KafkaConsumerExample:50 - value: pertain to any part of the Derivative Works, in at least one
2022-01-30 07:48:20 INFO KafkaConsumerExample:52 - headers:
まとめ
- Kafka Connect/Kafka Connector のカスタムリソースを使って、外部システムからデータを取り、Kafka Brokerへメッセージを追加した。
Kafka Connect を動かすというよりも、DebeziumのようなCDCや外部システムになりやすいS3やCouchbaseを使った例のほうがよかったかなと。。。DockerでDebeziumをつかっている例があります[2]ので、こちらをStrimziを利用した場合として、今度はまとめておこうと思います。
参考
[1] マイクロサービスとメッセージングのなぜ [概要編]
[2] DBへの変更をイベントストリームに変換するOSS: Debeziumの紹介とチュートリアル