Kafka Connectでデータパイプラインを構築
    • PDF

    Kafka Connectでデータパイプラインを構築

    • PDF

    Article Summary

    VPC環境で利用できます。

    本ガイドでは、サーバにKafka Connectをインストールし、Cloud Data Streaming Serviceを活用してMySQLの変更事項をElasticsearchに適用する方法を説明します。

    参考

    本ガイドは、ユーザーの理解を助けるために作成された例です。 本ガイドで使用されたconnectorやconfluent-hubのライセンスはconfluent社にあり、confluent社のライセンスポリシーに従います。 NAVERクラウドは、当該connectorやconfluent-hubを直接提供する事業者ではありません。ユーザーはconnectorやconfluent-hubを使用するかどうかを直接決定できます。

    事前作業

    本ガイドを実行する前に利用申込を完了する必要がある作業は、以下のとおりです。

    • サーバとVPC作成
    • Cloud DB for MySQLサーバ作成
    • Cloud Data Streaming Serviceクラスタ作成
    • Search Engine Serviceクラスタ作成

    ネットワークの設定

    Step 1. Cloud DB for MySQLの設定

    Cloud DB for MySQLのDBアカウントを追加する方法は以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Database > Cloud DB for MySQL > DB Serverメニューを順にクリックします。
    2. DB Serverを選択し、DB管理 > DB User管理をクリックします。
    3. 必要な情報を入力し、[DB User追加] ボタンをクリックします。
    4. DBアカウントが追加されたら、[保存] ボタンをクリックします。
      cdss-5-2_ja
    参考

    Cloud DB for MySQLに関する詳細については、Cloud DB for MySQLご利用ガイドをご参照ください。

    Step 2. ACGの設定

    Cloud Data Streaming Serviceブローカーノードの9092番ポートでアクセスできるようにACGを設定する方法は、以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACGメニューを順にクリックします。
    2. ACGリストで「cdss-b-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
    3. ACGルールを入力し、[追加] ボタンをクリックします。
      cdss-5-4_ja
      • プロトコル:TCP
      • アクセスソース:Kafka Connectが実行されるサーバのIP
      • 許可ポート:9092
    4. [適用] ボタンをクリックします。

    Search Engine Serviceマネージャーノードの9200番ポートでアクセスできるようにACGを設定する方法は、以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Compute > Server > ACGメニューを順にクリックします。
    2. ACGリストで「searchengine-m-xxxxx」を選択し、[ACG設定] ボタンをクリックします。
    3. ACGルールを入力し、[追加] ボタンをクリックします。
      cdss-5-6_ja
      • プロトコル:TCP
      • アクセスソース:Kafka Connectが実行されるサーバのIP
      • 許可ポート:9200

    サーバにKafka Connectをインストール

    Step 1. Javaのインストール

    Javaをインストールする方法は以下のとおりです。

    1. 以下のコマンドを入力してyumのアップデートを行います。

      yum update
      
    2. 以下のコマンドを入力してjava-1.8.0-openjdk-devel.x86_64をインストールします。

      yum install java-1.8.0-openjdk-devel.x86_64
      
    3. 以下のコマンドを入力して正常にインストールされたかどうか確認します。

      java -version
      javac -version
      

    Step 2. Kafka Connectのインストール

    Kafka Connectをインストールする方法は以下のとおりです。

    1. 以下のコマンドを入力してサーバの/rootパスにKafka Connectをダウンロードします。

      curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
      
    2. 以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。

      tar -zxvf confluent-community-7.0.1.tar.gz
      
    3. 以下のコマンドを入力して起動前にpropertiesファイルを修正します。

      • Cloud Data Streaming Serviceのブローカーノード情報を参照し、属性ファイルの「bootstrap.servers」にIPリストを追加します。
      vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
      bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
      

    Step 3. Confluent Hubのインストール

    Confluent Hubは、Kafka Connectで使用される様々なプラグインを手軽にダウンロードできる保存場所です。 サポートされるプラグインの全体リストは、Confluentで提供するConfluent Connector Portfolioを確認してください。

    1. 以下のコマンドを入力して/rootパスに新しいフォルダを作成し、そのフォルダに移動します。

      • 例では、「confluent-hub」という名前のフォルダを作成します。
      mkdir confluent-hub
      cd confluent-hub
      
    2. 以下のコマンドを入力して現在位置のパス(/root/confluent-hub)にConfluent Hubをダウンロードします。

      curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
      
    3. 以下のコマンドを入力してダウンロードしたファイルの圧縮を展開します。

      tar -zxvf confluent-hub-client-latest.tar.gz
      
    4. 以下のコマンドを入力して現在位置のパス(/root/confluent-hub)に今後プラグインが保存されるフォルダを作成します。

      • 例では、「plugins」という名前のフォルダを作成します。
      mkdir plugins
      
    5. 以下のコマンドを順に入力し、PATH環境変数に圧縮を展開したbinフォルダのパスを追加します。

      vi ~/.bashrc
      
      export CONFLUENT_HOME='~/confluent-hub'
      export PATH=$PATH:$CONFLUENT_HOME/bin
      
      source ~/.bashrc
      

    Step 4. MySQLコネクタのインストール

    以下のコマンドを入力してdebezium-connector-mysqlをインストールします。

    • --component-dirは、プラグインが実際にインストールされるフォルダのパスです。 STEP 3で作成した/root/confluent-hub/pluginsに設定します。
    • --worker-configsは、プラグインのインストール後に適用された属性ファイルのパスです。 Step 2で修正した/root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesに設定します。
    confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 5. Elasticsearchコネクタのインストール

    以下のコマンドを入力してkafka-connect-elasticsearchをインストールします。

    • --component-dir--worker-configsは、STEP 4と同様に適用します。
    confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 6. Kafka Connectプロセスの実行

    Kafka Connectプロセスを実行する方法は以下のとおりです。

    1. 以下のコマンドを入力してKafka Connectプロセスをバックグラウンドで実行します。

      /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
    2. 以下のコマンドを入力してプロセスが正常に動作するかどうか確認します。

      curl localhost:8083
      
      {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
      
    3. 以下のコマンドを入力して前の段階でインストールしたコネクタがすべて正常に表示されているかどうか確認します。

      curl localhost:8083/connector-plugins
      
      [
         {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
         {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
         {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
      ]
      

    Kafka Connectを使用する

    Step 1. MySQLテーブルの作成とデータの追加

    MySQLテーブルを作成してデータを追加する方法は以下のとおりです。

    1. 使用中のCloud DB for MySQLサーバにアクセスします。
    2. 以下のコマンドを入力してテーブルを作成し、データを追加します。
      • 例では、「member」という名前のテーブルを作成します。
      CREATE TABLE IF NOT EXISTS member (
       id int NOT NULL PRIMARY KEY,
       name varchar(100),
       email varchar(200),
       department varchar(200)
      );
      
      INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
      INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
      INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
      INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
      

    Step 2. MySQLコネクタの登録

    MySQLコネクタを登録する方法は以下のとおりです。

    1. Kafka Connectがインストールされたサーバにアクセスします。
    2. リクエスト時に送るJSON bodyを個別の環境に合わせて以下の形式で入力します。
      {
         "name": "mysql-connector", // コネクタの名前
         "config": {
           "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 使用するプラグインの種類
           "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQLサーバのエンドポイント
           "database.port": "3306", // MySQLサーバのポート
           "database.user": "kimdong", // MySQLサーバのユーザー
           "database.password": "1234", // MySQLサーバユーザーのパスワード
           "database.server.id": "184054", // Kafka Connectで使用されるMySQLサーバのUUID
           "database.server.name": "NCP_MYSQL", // Kafka Connectで使用するMySQLサーバに付与する名前、今後Kafkaトピックの接頭辞として使用される 
           "database.whitelist": "test", // Kafka ConnectでアクセスするMySQLサーバのデータベースを指定
           "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // ブローカーノード情報
           "database.history.kafka.topic": "this_is_topic", // MySQLヒストリーの変更履歴を保存するトピック名 
           "snapshot.locking.mode": "none",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter"
         }
      }
      
    3. 以下のコマンドを入力してMySQLコネクタを登録します。
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
      
      
      

    Step 3. トピックの確認

    トピックが作成されたか確認する方法は以下のとおりです。

    1. Cloud Data Streaming Serviceで提供するCMAKにアクセスします。
    2. STEP 2で設定したbodyの情報通りにトピックが作成されたことを確認します。
      • STEP 1で作成したMySQLのmemberテーブルの変更情報が盛り込まれるトピックは、「NCP_MYSQL.test.member」です。
        cdss-5-7_ja

    Step 4. Elasticsearchコネクタの登録

    Elasticsearchコネクタを登録する方法は以下のとおりです。

    1. Kafka Connectがインストールされたサーバにアクセスします。
    2. リクエスト時に送るJSON bodyを個別の環境に合わせて以下の形式で入力します。
      {
         "name": "es-connector", // コネクタの名前
         "config": {
           "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 使用するプラグインの種類
           "connection.url": "http://10.0.100.9:9200", // トピックを持ってくるKafkaブローカーノード
           "tasks.max": "1",
           "topics": "NCP_MYSQL.test.member", // コンシュームするトピックの名前
           "type.name": "_doc",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "true",
           "value.converter.schemas.enable": "true",
           "transforms": "extractKey",
           "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
           "transforms.extractKey.field": "id", // MySQLテーブルで使用する主キー名
           "behavior.on.null.values": "IGNORE"
         }
      }
      
    3. 以下のコマンドを入力してElasticsearchコネクタを登録します。
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
      

    Step 5. Elasticsearchデータの確認

    Elasticsearchデータを確認する方法は以下のとおりです。

    1. Search Engine Serviceで提供するKibanaのエンドポイントでアクセスします。
    2. インデックスリストを照会し、「ncp_mysql.test.member」という名前のインデックスが作成されたことを確認します。
      cdss-5-8_ja
    3. ncp_mysql.test.memberインデックスの内容を照会します。
      cdss-5-9_ja
    4. ncp_mysql.test.memberインデックスの特定のドキュメントを照会します。
      • MySQLで追加したデータは_source.afterで確認できます。
        cdss-5-10_ja

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.