Kafka Connect によるデータパイプライン構築

Prev Next

VPC環境で利用できます。

本ガイドでは、サーバに Kafka Connectをインストールした後、Cloud Data Streaming Serviceを活用して MySQLの変更内容を Elasticsearchに適用する方法について説明します。

参考

本ガイドは、ユーザーの理解を助けるために作成された例です。本ガイドで使用された connectorや confluent-hubのライセンスは confluent社にあり、confluent社のライセンスポリシーに従います。NAVERクラウドは、当該 connectorや confluent-hubを直接提供する事業者ではありません。ユーザーは connectorや confluent-hubの使用有無を直接決定できます。

事前タスク

本ガイドを実行する前にご利用の申し込みを完了する必要があるタスクは、次の通りです。

  • サーバと VPC作成
  • Cloud DB for MySQLサーバ作成
  • Cloud Data Streaming Serviceクラスタ作成
  • Search Engine Serviceクラスタ作成

ネットワーク設定

Step 1.Cloud DB for MySQL設定

Cloud DB for MySQLの DBアカウントを追加する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Database > Cloud DB for MySQL > DB Serverメニューを順にクリックします。
  2. DB Serverを選択し、DB管理 > DB User管理をクリックします。
  3. 必要な情報を入力し、 [DB User追加] ボタンをクリックします。
  4. DBアカウントが追加されたら、 [保存] ボタンをクリックします。
    cdss-5-2_ko
参考

Cloud DB for MySQLに関する詳細は、Cloud DB for MySQL ご利用ガイドをご参照ください。

Step 2.ACG設定

Cloud Data Streaming Serviceブローカーノードの9092番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「cdss-b-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-5-4_ko
    • プロトコル: TCP
    • アクセスソース: Kafka Connectが実行されるサーバの IPアドレス
    • 許可ポート: 9092
  4. [適用] ボタンをクリックします。

Search Engine Serviceマネージャノードの9200番ポートにアクセスできるように ACGを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Server > ACGメニューを順にクリックします。
  2. ACGリストから「searchengine-m-xxxxx」を選択し、 [ACG設定] ボタンをクリックします。
  3. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-5-6_ko
    • プロトコル: TCP
    • アクセスソース: Kafka Connectが実行されるサーバの IPアドレス
    • 許可ポート: 9200

サーバに Kafka Connectをインストール

Step 1.Javaのインストール

Javaをインストールする方法は次の通りです。

  1. 以下のコマンドを入力して yumのアップデートを行います。

    yum update
    
  2. 以下のコマンドを入力して java-1.8.0-openjdk-devel.x86_64をインストールします。

    yum install java-1.8.0-openjdk-devel.x86_64
    
  3. 以下のコマンドを入力して正常にインストールされたかどうか確認します。

    java -version
    javac -version
    

Step 2.Kafka Connectのインストール

Kafka Connectをインストールする方法は、次の通りです。

  1. 以下のコマンドを入力してサーバの /rootパスに Kafka Connectをダウンロードします。

    curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
    
  2. 以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。

    tar -zxvf confluent-community-7.0.1.tar.gz
    
  3. 以下のコマンドを入力して起動前に propertiesファイルを変更します。

    • Cloud Data Streaming Serviceのブローカーノード情報を参照し、propertiesファイルの「bootstrap.servers」に ipアドレスリストを追加します。
    vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
    bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
    

Step 3.Confluent Hubのインストール

Confluent Hubは Kafka Connectで使用される様々なプラグインを手軽にダウンロードできるストレージです。サポートされているプラグインの全リストについては、Confluentが提供する Confluent Connector Portfolioをご確認ください。

  1. 以下のコマンドを入力して /root パスに新しいフォルダを作成し、そのフォルダに移動します。

    • ユースケースでは、「confluent-hub」という名前のフォルダを作成します。
    mkdir confluent-hub
    cd confluent-hub
    
  2. 以下のコマンドを入力して現在位置のパス(/root/confluent-hub)に Confluent Hubをダウンロードします。

    curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
    
  3. 以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。

    tar -zxvf confluent-hub-client-latest.tar.gz
    
  4. 以下のコマンドを入力して現在位置のパス(/root/confluent-hub)に今後プラグインが保存されるフォルダを作成します。

    • 例では、「plugins」という名前のフォルダを作成します。
    mkdir plugins
    
  5. 以下のコマンドを順に入力し、PATH環境変数に圧縮を展開した binフォルダのパスを追加します。

    vi ~/.bashrc
    
    export CONFLUENT_HOME='~/confluent-hub'
    export PATH=$PATH:$CONFLUENT_HOME/bin
    
    source ~/.bashrc
    

Step 4.MySQL Connectorのインストール

以下のコマンドを入力して debezium-connector-mysqlをインストールします。

  • --component-dirは、プラグインが実際にインストールされるフォルダのパスです。STEP 3で作成した/root/confluent-hub/pluginsに設定します。
  • --worker-configsは、プラグインのインストール後に適用された属性ファイルのパスです。Step 2で変更した /root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesに設定します。
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties

Step 5.Elasticsearch Connectorのインストール

以下のコマンドを入力して kafka-connect-elasticsearchをインストールします。

  • --component-dir--worker-configsは、STEP 4と同様に適用します。
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties

Step 6.Kafka Connectプロセス実行

Kafka Connectプロセスを実行する方法は、次の通りです。

  1. 以下のコマンドを入力して Kafka Connectプロセスをバックグラウンドで実行します。

    /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
  2. 以下のコマンドを入力してプロセスが正常に動作するかどうか確認します。

    curl localhost:8083
    
    {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
    
  3. 以下のコマンドを入力して前の段階でインストールしたコネクタがすべて正常に表示されているかどうか確認します。

    curl localhost:8083/connector-plugins
    
    [
       {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
       {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
       {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
    ]
    

Kafka Connectを使用する

Step 1.MySQLテーブルの作成とデータの追加

MySQLテーブルを作成してデータを追加する方法は、次の通りです。

  1. 現在使用中の Cloud DB for MySQLサーバにアクセスします。
  2. 以下のコマンドを入力してテーブルを作成し、データを追加します。
    • 例では、「member」という名前のテーブルを作成します。
    CREATE TABLE IF NOT EXISTS member (
     id int NOT NULL PRIMARY KEY,
     name varchar(100),
     email varchar(200),
     department varchar(200)
    );
    
    INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
    INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
    INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
    INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
    

Step 2.MySQL Connectorの登録

MySQL Connectorを登録する方法は、次の通りです。

  1. Kafka Connectがインストールされたサーバにアクセスします。
  2. リクエスト時に送る JSON bodyを個人用設定に合わせて以下の形式で入力します。
    {
       "name": "mysql-connector", // connectorの名前
       "config": {
         "connector.class": "io.debezium.connector.mysql.MySqlConnector",// 使用するプラグインの種類
         "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQLサーバのエンドポイント
         "database.port": "3306", // MySQLサーバのポート
         "database.user": "kimdong", // MySQLサーバの user
         "database.password": "1234", // MySQLサーバ userの password
         "database.server.id": "184054", // kafka connectで使用する MySQLサーバの uuid
         "database.server.name": "NCP_MYSQL", // Kafka Connectで使用する MySQLサーバに付与する名前、今後 Kafka topicのプレフィックスとして使用 
         "database.whitelist": "test", // kafka connectでアクセスする MySQLサーバのデータベースを指定
         "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Brokerのノード情報
         "database.history.kafka.topic": "this_is_topic", // MySQLヒストリーの変更履歴を保存する topic名 
         "snapshot.locking.mode": "none",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter"
       }
    }
    
  3. 以下のコマンドを入力して MySQL Connectorを登録します。
    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
    
    
    

Step 3.Topic確認

Topicが作成されたか確認する方法は、次の通りです。

  1. Cloud Data Streaming Serviceが提供する CMAKにアクセスします。
  2. STEP 2で設定した bodyの情報通りに Topicが作成されたことを確認します。
    • STEP 1で作成した MySQLの memberテーブルの変更情報が盛り込まれる Topicは、「NCP_MYSQL.test.member」です。
      cdss-5-7_ko

Step 4.Elasticsearch Connectorの登録

Elasticsearch Connectorを登録する方法は、次の通りです。

  1. Kafka Connectがインストールされたサーバにアクセスします。
  2. リクエスト時に送る JSON bodyを個人用設定に合わせて以下の形式で入力します。
    {
       "name": "es-connector", // connectorの名前
       "config": {
         "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 使用するプラグインの種類
         "connection.url": "http://10.0.100.9:9200", // topicを持ち込む Kafkaブローカーノード
         "tasks.max": "1",
         "topics": "NCP_MYSQL.test.member", // コンシュームする topicの名前
         "type.name": "_doc",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter.schemas.enable": "true",
         "value.converter.schemas.enable": "true",
         "transforms": "extractKey",
         "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
         "transforms.extractKey.field": "id", // MySQLテーブルで使用する pk名
         "behavior.on.null.values": "IGNORE"
       }
    }
    
  3. 以下のコマンドを入力して Elasticsearch Connectorを登録します。
    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
    

Step 5.Elasticsearchデータ確認

Elasticsearchデータを確認する方法は、次の通りです。

  1. Search Engine Serviceが提供する Kibanaのエンドポイントにアクセスします。
  2. インデックスリストを照会し、ncp_mysql.test.memberという名前のインデックスが作成されたことを確認します。
    cdss-5-8_ko
  3. ncp_mysql.test.memberインデックスの内容を照会します。
    cdss-5-9_ko
  4. ncp_mysql.test.memberインデックスの特定のドキュメントを照会します。
    • MySQLで追加したデータは _source.afterで確認できます。
      cdss-5-10_ko