VPC環境で利用できます。
本ガイドでは、サーバに Kafka Connectをインストールした後、Cloud Data Streaming Serviceを活用して MySQLの変更内容を Elasticsearchに適用する方法について説明します。
本ガイドは、ユーザーの理解を助けるために作成された例です。本ガイドで使用された connectorや confluent-hubのライセンスは confluent社にあり、confluent社のライセンスポリシーに従います。NAVERクラウドは、当該 connectorや confluent-hubを直接提供する事業者ではありません。ユーザーは connectorや confluent-hubの使用有無を直接決定できます。
事前タスク
本ガイドを実行する前にご利用の申し込みを完了する必要があるタスクは、次の通りです。
- サーバと VPC作成
- Cloud DB for MySQLサーバ作成
- Cloud Data Streaming Serviceクラスタ作成
- Search Engine Serviceクラスタ作成
ネットワーク設定
Step 1.Cloud DB for MySQL設定
Cloud DB for MySQLの DBアカウントを追加する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Database > Cloud DB for MySQL > DB Serverメニューを順にクリックします。 - DB Serverを選択し、DB管理 > DB User管理をクリックします。
- 必要な情報を入力し、 [DB User追加] ボタンをクリックします。
- DBアカウントが追加されたら、 [保存] ボタンをクリックします。

Cloud DB for MySQLに関する詳細は、Cloud DB for MySQL ご利用ガイドをご参照ください。
Step 2.ACG設定
Cloud Data Streaming Serviceブローカーノードの9092番ポートにアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Compute > Server > ACGメニューを順にクリックします。 - ACGリストから「cdss-b-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
- ACGルールを入力し、 [追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Kafka Connectが実行されるサーバの IPアドレス
- 許可ポート: 9092
- [適用] ボタンをクリックします。
Search Engine Serviceマネージャノードの9200番ポートにアクセスできるように ACGを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Compute > Server > ACGメニューを順にクリックします。 - ACGリストから「searchengine-m-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
- ACGルールを入力し、 [追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Kafka Connectが実行されるサーバの IPアドレス
- 許可ポート: 9200
サーバに Kafka Connectをインストール
Step 1.Javaのインストール
Javaをインストールする方法は次の通りです。
-
以下のコマンドを入力して yumのアップデートを行います。
yum update -
以下のコマンドを入力して java-1.8.0-openjdk-devel.x86_64をインストールします。
yum install java-1.8.0-openjdk-devel.x86_64 -
以下のコマンドを入力して正常にインストールされたかどうか確認します。
java -version javac -version
Step 2.Kafka Connectのインストール
Kafka Connectをインストールする方法は、次の通りです。
-
以下のコマンドを入力してサーバの
/rootパスに Kafka Connectをダウンロードします。curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz -
以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。
tar -zxvf confluent-community-7.0.1.tar.gz -
以下のコマンドを入力して起動前に
propertiesファイルを変更します。- Cloud Data Streaming Serviceのブローカーノード情報を参照し、propertiesファイルの「bootstrap.servers」に ipアドレスリストを追加します。
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesbootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
Step 3.Confluent Hubのインストール
Confluent Hubは Kafka Connectで使用される様々なプラグインを手軽にダウンロードできるストレージです。サポートされているプラグインの全リストについては、Confluentが提供する Confluent Connector Portfolioをご確認ください。
-
以下のコマンドを入力して
/rootパスに新しいフォルダを作成し、そのフォルダに移動します。- ユースケースでは、「confluent-hub」という名前のフォルダを作成します。
mkdir confluent-hub cd confluent-hub -
以下のコマンドを入力して現在位置のパス(
/root/confluent-hub)に Confluent Hubをダウンロードします。curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz -
以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。
tar -zxvf confluent-hub-client-latest.tar.gz -
以下のコマンドを入力して現在位置のパス(
/root/confluent-hub)に今後プラグインが保存されるフォルダを作成します。- 例では、「plugins」という名前のフォルダを作成します。
mkdir plugins -
以下のコマンドを順に入力し、PATH環境変数に圧縮を展開した binフォルダのパスを追加します。
vi ~/.bashrcexport CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/binsource ~/.bashrc
Step 4.MySQL Connectorのインストール
以下のコマンドを入力して debezium-connector-mysqlをインストールします。
--component-dirは、プラグインが実際にインストールされるフォルダのパスです。STEP 3で作成した/root/confluent-hub/pluginsに設定します。--worker-configsは、プラグインのインストール後に適用された属性ファイルのパスです。Step 2で変更した/root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesに設定します。
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 5.Elasticsearch Connectorのインストール
以下のコマンドを入力して kafka-connect-elasticsearchをインストールします。
--component-dirと--worker-configsは、STEP 4と同様に適用します。
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 6.Kafka Connectプロセス実行
Kafka Connectプロセスを実行する方法は、次の通りです。
-
以下のコマンドを入力して Kafka Connectプロセスをバックグラウンドで実行します。
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties -
以下のコマンドを入力してプロセスが正常に動作するかどうか確認します。
curl localhost:8083{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"} -
以下のコマンドを入力して前の段階でインストールしたコネクタがすべて正常に表示されているかどうか確認します。
curl localhost:8083/connector-plugins[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Kafka Connectを使用する
Step 1.MySQLテーブルの作成とデータの追加
MySQLテーブルを作成してデータを追加する方法は、次の通りです。
- 現在使用中の Cloud DB for MySQLサーバにアクセスします。
- 以下のコマンドを入力してテーブルを作成し、データを追加します。
- 例では、「member」という名前のテーブルを作成します。
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
Step 2.MySQL Connectorの登録
MySQL Connectorを登録する方法は、次の通りです。
- Kafka Connectがインストールされたサーバにアクセスします。
- リクエスト時に送る JSON bodyを個人用設定に合わせて以下の形式で入力します。
{ "name": "mysql-connector", // connectorの名前 "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",// 使用するプラグインの種類 "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQLサーバのエンドポイント "database.port": "3306", // MySQLサーバのポート "database.user": "kimdong", // MySQLサーバの user "database.password": "1234", // MySQLサーバ userの password "database.server.id": "184054", // kafka connectで使用する MySQLサーバの uuid "database.server.name": "NCP_MYSQL", // Kafka Connectで使用する MySQLサーバに付与する名前、今後 Kafka topicのプレフィックスとして使用 "database.whitelist": "test", // kafka connectでアクセスする MySQLサーバのデータベースを指定 "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Brokerのノード情報 "database.history.kafka.topic": "this_is_topic", // MySQLヒストリーの変更履歴を保存する topic名 "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } } - 以下のコマンドを入力して MySQL Connectorを登録します。
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
Step 3.Topic確認
Topicが作成されたか確認する方法は、次の通りです。
- Cloud Data Streaming Serviceが提供する CMAKにアクセスします。
- STEP 2で設定した bodyの情報通りに Topicが作成されたことを確認します。
- STEP 1で作成した MySQLの memberテーブルの変更情報が盛り込まれる Topicは、「NCP_MYSQL.test.member」です。

- STEP 1で作成した MySQLの memberテーブルの変更情報が盛り込まれる Topicは、「NCP_MYSQL.test.member」です。
Step 4.Elasticsearch Connectorの登録
Elasticsearch Connectorを登録する方法は、次の通りです。
- Kafka Connectがインストールされたサーバにアクセスします。
- リクエスト時に送る JSON bodyを個人用設定に合わせて以下の形式で入力します。
{ "name": "es-connector", // connectorの名前 "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 使用するプラグインの種類 "connection.url": "http://10.0.100.9:9200", // topicを持ち込む Kafkaブローカーノード "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // コンシュームする topicの名前 "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // MySQLテーブルで使用する pk名 "behavior.on.null.values": "IGNORE" } } - 以下のコマンドを入力して Elasticsearch Connectorを登録します。
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
Step 5.Elasticsearchデータ確認
Elasticsearchデータを確認する方法は、次の通りです。
- Search Engine Serviceが提供する Kibanaのエンドポイントにアクセスします。
- インデックスリストを照会し、ncp_mysql.test.memberという名前のインデックスが作成されたことを確認します。

- ncp_mysql.test.memberインデックスの内容を照会します。

- ncp_mysql.test.memberインデックスの特定のドキュメントを照会します。
- MySQLで追加したデータは
_source.afterで確認できます。

- MySQLで追加したデータは