- 印刷する
- PDF
Kafka Connectでデータパイプラインを構築
- 印刷する
- PDF
VPC環境で利用できます。
本ガイドでは、サーバにKafka Connectをインストールし、Cloud Data Streaming Serviceを活用してMySQLの変更事項をElasticsearchに適用する方法を説明します。
本ガイドは、ユーザーの理解を助けるために作成された例です。 本ガイドで使用されたconnectorやconfluent-hubのライセンスはconfluent社にあり、confluent社のライセンスポリシーに従います。 NAVERクラウドは、当該connectorやconfluent-hubを直接提供する事業者ではありません。ユーザーはconnectorやconfluent-hubを使用するかどうかを直接決定できます。
事前作業
本ガイドを実行する前に利用申込を完了する必要がある作業は、以下のとおりです。
- サーバとVPC作成
- Cloud DB for MySQLサーバ作成
- Cloud Data Streaming Serviceクラスタ作成
- Search Engine Serviceクラスタ作成
ネットワークの設定
Step 1. Cloud DB for MySQLの設定
Cloud DB for MySQLのDBアカウントを追加する方法は以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Database > Cloud DB for MySQL > DB Serverメニューを順にクリックします。
- DB Serverを選択し、DB管理 > DB User管理をクリックします。
- 必要な情報を入力し、[DB User追加] ボタンをクリックします。
- DBアカウントが追加されたら、[保存] ボタンをクリックします。
Cloud DB for MySQLに関する詳細については、Cloud DB for MySQLご利用ガイドをご参照ください。
Step 2. ACGの設定
Cloud Data Streaming Serviceブローカーノードの9092番ポートでアクセスできるようにACGを設定する方法は、以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACGメニューを順にクリックします。
- ACGリストで「cdss-b-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル:TCP
- アクセスソース:Kafka Connectが実行されるサーバのIP
- 許可ポート:9092
- [適用] ボタンをクリックします。
Search Engine Serviceマネージャーノードの9200番ポートでアクセスできるようにACGを設定する方法は、以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACGメニューを順にクリックします。
- ACGリストで「searchengine-m-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル:TCP
- アクセスソース:Kafka Connectが実行されるサーバのIP
- 許可ポート:9200
サーバにKafka Connectをインストール
Step 1. Javaのインストール
Javaをインストールする方法は以下のとおりです。
以下のコマンドを入力してyumのアップデートを行います。
yum update
以下のコマンドを入力してjava-1.8.0-openjdk-devel.x86_64をインストールします。
yum install java-1.8.0-openjdk-devel.x86_64
以下のコマンドを入力して正常にインストールされたかどうか確認します。
java -version javac -version
Step 2. Kafka Connectのインストール
Kafka Connectをインストールする方法は以下のとおりです。
以下のコマンドを入力してサーバの
/root
パスにKafka Connectをダウンロードします。curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。
tar -zxvf confluent-community-7.0.1.tar.gz
以下のコマンドを入力して起動前に
properties
ファイルを修正します。- Cloud Data Streaming Serviceのブローカーノード情報を参照し、属性ファイルの「bootstrap.servers」にIPリストを追加します。
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
Step 3. Confluent Hubのインストール
Confluent Hubは、Kafka Connectで使用される様々なプラグインを手軽にダウンロードできる保存場所です。 サポートされるプラグインの全体リストは、Confluentで提供するConfluent Connector Portfolioを確認してください。
以下のコマンドを入力して
/root
パスに新しいフォルダを作成し、そのフォルダに移動します。- 例では、「confluent-hub」という名前のフォルダを作成します。
mkdir confluent-hub cd confluent-hub
以下のコマンドを入力して現在位置のパス(
/root/confluent-hub
)にConfluent Hubをダウンロードします。curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。
tar -zxvf confluent-hub-client-latest.tar.gz
以下のコマンドを入力して現在位置のパス(
/root/confluent-hub
)に今後プラグインが保存されるフォルダを作成します。- 例では、「plugins」という名前のフォルダを作成します。
mkdir plugins
以下のコマンドを順に入力し、PATH環境変数に圧縮を展開したbinフォルダのパスを追加します。
vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/bin
source ~/.bashrc
Step 4. MySQLコネクタのインストール
以下のコマンドを入力してdebezium-connector-mysqlをインストールします。
--component-dir
は、プラグインが実際にインストールされるフォルダのパスです。 STEP 3で作成した/root/confluent-hub/plugins
に設定します。--worker-configs
は、プラグインのインストール後に適用された属性ファイルのパスです。 Step 2で修正した/root/confluent-7.0.1/etc/kafka/connect-distributed.properties
に設定します。
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 5. Elasticsearchコネクタのインストール
以下のコマンドを入力してkafka-connect-elasticsearchをインストールします。
--component-dir
と--worker-configs
は、STEP 4と同様に適用します。
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 6. Kafka Connectプロセスの実行
Kafka Connectプロセスを実行する方法は以下のとおりです。
以下のコマンドを入力してKafka Connectプロセスをバックグラウンドで実行します。
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
以下のコマンドを入力してプロセスが正常に動作するかどうか確認します。
curl localhost:8083
{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
以下のコマンドを入力して前の段階でインストールしたコネクタがすべて正常に表示されているかどうか確認します。
curl localhost:8083/connector-plugins
[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Kafka Connectを使用する
Step 1. MySQLテーブルの作成とデータの追加
MySQLテーブルを作成してデータを追加する方法は以下のとおりです。
- 使用中のCloud DB for MySQLサーバにアクセスします。
- 以下のコマンドを入力してテーブルを作成し、データを追加します。
- 例では、「member」という名前のテーブルを作成します。
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
Step 2. MySQLコネクタの登録
MySQLコネクタを登録する方法は以下のとおりです。
- Kafka Connectがインストールされたサーバにアクセスします。
- リクエスト時に送るJSON bodyを個別の環境に合わせて以下の形式で入力します。
{ "name": "mysql-connector", // コネクタの名前 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 使用するプラグインの種類 "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQLサーバのエンドポイント "database.port": "3306", // MySQLサーバのポート "database.user": "kimdong", // MySQLサーバのユーザー "database.password": "1234", // MySQLサーバユーザーのパスワード "database.server.id": "184054", // Kafka Connectで使用されるMySQLサーバのUUID "database.server.name": "NCP_MYSQL", // Kafka Connectで使用するMySQLサーバに付与する名前、今後Kafkaトピックの接頭辞として使用される "database.whitelist": "test", // Kafka ConnectでアクセスするMySQLサーバのデータベースを指定 "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // ブローカーノード情報 "database.history.kafka.topic": "this_is_topic", // MySQLヒストリーの変更履歴を保存するトピック名 "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }
- 以下のコマンドを入力してMySQLコネクタを登録します。
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
Step 3. トピックの確認
トピックが作成されたか確認する方法は以下のとおりです。
- Cloud Data Streaming Serviceで提供するCMAKにアクセスします。
- STEP 2で設定したbodyの情報通りにトピックが作成されたことを確認します。
- STEP 1で作成したMySQLのmemberテーブルの変更情報が盛り込まれるトピックは、「NCP_MYSQL.test.member」です。
- STEP 1で作成したMySQLのmemberテーブルの変更情報が盛り込まれるトピックは、「NCP_MYSQL.test.member」です。
Step 4. Elasticsearchコネクタの登録
Elasticsearchコネクタを登録する方法は以下のとおりです。
- Kafka Connectがインストールされたサーバにアクセスします。
- リクエスト時に送るJSON bodyを個別の環境に合わせて以下の形式で入力します。
{ "name": "es-connector", // コネクタの名前 "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 使用するプラグインの種類 "connection.url": "http://10.0.100.9:9200", // トピックを持ってくるKafkaブローカーノード "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // コンシュームするトピックの名前 "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // MySQLテーブルで使用する主キー名 "behavior.on.null.values": "IGNORE" } }
- 以下のコマンドを入力してElasticsearchコネクタを登録します。
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
Step 5. Elasticsearchデータの確認
Elasticsearchデータを確認する方法は以下のとおりです。
- Search Engine Serviceで提供するKibanaのエンドポイントでアクセスします。
- インデックスリストを照会し、「ncp_mysql.test.member」という名前のインデックスが作成されたことを確認します。
- ncp_mysql.test.memberインデックスの内容を照会します。
- ncp_mysql.test.memberインデックスの特定のドキュメントを照会します。
- MySQLで追加したデータは
_source.after
で確認できます。
- MySQLで追加したデータは