Build data pipeline with Kafka Connect
    • PDF

    Build data pipeline with Kafka Connect

    • PDF

    Article Summary

    Available in VPC

    This guide describes how to install Kafka Connect on a server, and then apply the changes in MySQL to Elasticsearch using Cloud Data Streaming Service.

    Note

    This guide is an example intended to help improve the user's understanding. Confluent owns the license for the connector and Confluent-Hub used in this guide, and it follows Confluent's license policy. NAVER Cloud does not directly provide the connector and Confluent-Hub, and users can choose whether to use them at their own discretion.

    Preparations

    Before following this guide, you need to complete the subscription for the following jobs:

    • Create server and VPC
    • Create Cloud DB for MySQL server
    • Create Cloud Data Streaming Service cluster
    • Create Search Engine Service cluster

    Set network

    Step 1. Set Cloud DB for MySQL

    The following describes how to create a DB account for Cloud DB for MySQL.

    1. From the NAVER Cloud Platform console, click the Services > Database > Cloud DB for MySQL > DB Server menus, in that order.
    2. Select a DB server, and then click Manage DB > Manage DB user.
    3. Enter the necessary information, and then click the [Add DB user] button.
    4. Click the [Save] button once the DB account is added.
      cdss-5-2_en
    Note

    Please refer to the Cloud DB for MySQL Guide for more information about Cloud DB for MySQL.

    Step 2. Set ACG

    The following describes how to set ACG so that access to Port 9092 of the Cloud Data Streaming Service broker node is allowed.

    1. From the NAVER Cloud Platform console, click the Services > Compute > Server > ACG menus, in that order.
    2. Select "cdss-b-xxxxx" from the ACG list, and then click the [Set ACG] button.
    3. Enter the ACG rule, and then click the [Add] button.
      cdss-5-4_en
      • Protocol: TCP
      • Access source: IP of the server where Kafka Connect will run
      • Allowed port: 9092
    4. Click the [Apply] button.

    The following describes how to set ACG so that access to Port 9200 of the Search Engine Service manager node is allowed.

    1. From the NAVER Cloud Platform console, click the Services > Compute > Server > ACG menus, in that order.
    2. Select "searchengine-m-xxxxx" from the ACG list, and then click the [Set ACG] button.
    3. Enter the ACG rule, and then click the [Add] button.
      cdss-5-6_en
      • Protocol: TCP
      • Access source: IP of the server where Kafka Connect will run
      • Allowed port: 9200

    Install Kafka Connect on server

    Step 1. Install Java

    The following describes how to install Java.

    1. Enter the following command to update yum.

      yum update
      
    2. Enter the following command to install java-1.8.0-openjdk-devel.x86_64.

      yum install java-1.8.0-openjdk-devel.x86_64
      
    3. Enter the following command to check if the installation has been completed successfully.

      java -version
      javac -version
      

    Step 2. Install Kafka Connect

    The following describes how to install Kafka Connect.

    1. Enter the following command to download Kafka Connect to the /root path of the server.

      curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
      
    2. Enter the following command to decompress the downloaded file.

      tar -zxvf confluent-community-7.0.1.tar.gz
      
    3. Enter the following command to modify the properties file before starting.

      • Add the IP list to "bootstrap.servers" of the properties file by referring to the broker node information of Cloud Data Streaming Service.
      vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
      bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
      

    Step 3. Install Confluent Hub

    Confluent Hub is a storage where you can easily download different plugins used in Kafka Connect. Please check Confluent Connector Portfolio provided by Confluent for the complete list of supported plugins.

    1. Enter the following command to create a new folder in the /root path, and then go to that folder.

      • For this example, we will create a folder named "confluent-hub."
      mkdir confluent-hub
      cd confluent-hub
      
    2. Enter the following command to download Confluent Hub to the current path (/root/confluent-hub).

      curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
      
    3. Enter the following command to decompress the downloaded file.

      tar -zxvf confluent-hub-client-latest.tar.gz
      
    4. Enter the following command to create a folder where the plugins will be saved later under the current path (/root/confluent-hub).

      • For this example, we will create a folder named "plugins."
      mkdir plugins
      
    5. Enter the following commands in sequence to add the path of the decompressed bin folder to the environment variable PATH.

      vi ~/.bashrc
      
      export CONFLUENT_HOME='~/confluent-hub'
      export PATH=$PATH:$CONFLUENT_HOME/bin
      
      source ~/.bashrc
      

    Step 4. Install MySQL connector

    Enter the following command to install debezium-connector-mysql.

    • --component-dir is the folder path where the actual plugin will be installed. Set it to /root/confluent-hub/plugins, as created in Step 3.
    • --worker-configs is the path of the properties file after the plugin is installed and applied. Set it to /root/confluent-7.0.1/etc/kafka/connect-distributed.properties, as modified in Step 2.
    confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 5. Install Elasticsearch connector

    Enter the following command to install kafka-connect-elasticsearch.

    • Apply the same instructions to --component-dir and --worker-configs as in Step 4.
    confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 6. Run Kafka Connect process

    The following describes how to run the Kafka Connect process.

    1. Enter the following command to run the Kafka Connect process in the background.

      /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
    2. Enter the following command to check if the process is working normally.

      curl localhost:8083
      
      {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
      
    3. Enter the following command to check if the connectors installed earlier all show up normally.

      curl localhost:8083/connector-plugins
      
      [
         {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
         {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
         {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
      ]
      

    Use Kafka Connect

    Step 1. Create MySQL table and add data

    The following describes how to create a MySQL table and add data.

    1. Access the Cloud DB for MySQL server in use.
    2. Enter the following command to create a table, and then add data.
      • For this example, we will create a table named member.
      CREATE TABLE IF NOT EXISTS member (
       id int NOT NULL PRIMARY KEY,
       name varchar(100),
       email varchar(200),
       department varchar(200)
      );
      
      INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
      INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
      INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
      INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
      

    Step 2. Register MySQL connector

    The following describes how to register the MySQL connector.

    1. Access the server where Kafka Connect is installed.
    2. Enter the JSON body to be sent when requesting in the following format. Adjust it to the personal environment.
      {
         "name": "mysql-connector", // Connector's name
         "config": {
           "connector.class": "io.debezium.connector.mysql.MySqlConnector", // Type of the plugin to use
           "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL server's endpoint
           "database.port": "3306", // MySQL server's port
           "database.user": "kimdong", // MySQL server's user
           "database.password": "1234", // MySQL server user's password
           "database.server.id": "184054", // MySQL server's UUID used by Kafka Connect
           "database.server.name": "NCP_MYSQL", // Name given to the MySQL server to be used by Kafka Connect, which will be used as a prefix for Kafka topics 
           "database.whitelist": "test", // Specify the MySQL server's database to be accessed from Kafka Connect
           "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker node information
           "database.history.kafka.topic": "this_is_topic", // Name of the topic to save the MySQL change history 
           "snapshot.locking.mode": "none",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter"
         }
      }
      
    3. Enter the following command to register the MySQL connector.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
      
      
      

    Step 3. View topic

    The following describes how to check if the topic has been created.

    1. Access CMAK provided by Cloud Data Streaming Service.
    2. Check if the topic has been created according to the body information configured in Step 2.
      • The topic where the changes in the MySQL member table created in Step 1 will be saved is "NCP_MYSQL.test.member."
        cdss-5-7_en

    Step 4. Register Elasticsearch connector

    The following describes how to register the Elasticsearch connector.

    1. Access the server where Kafka Connect is installed.
    2. Enter the JSON body to be sent when requesting in the following format. Adjust it to the personal environment.
      {
         "name": "es-connector", // Connector's name
         "config": {
           "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // Type of the plugin to use
           "connection.url": "http://10.0.100.9:9200", // Kafka broker node to import the topic
           "tasks.max": "1",
           "topics": "NCP_MYSQL.test.member", // Name of the topic to consume
           "type.name": "_doc",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "true",
           "value.converter.schemas.enable": "true",
           "transforms": "extractKey",
           "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
           "transforms.extractKey.field": "id", // Name of the primary key used by MySQL table
           "behavior.on.null.values": "IGNORE"
         }
      }
      
    3. Enter the following command to register the Elasticsearch connector.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
      

    Step 5. View Elasticsearch data

    The following describes how to check the Elasticsearch data.

    1. Access the Kibana's endpoint provided by Search Engine Service.
    2. Search the index list, and check if an index called ncp_mysql.test.member has been created.
      cdss-5-8_en
    3. View the content of the ncp_mysql.test.member index.
      cdss-5-9_en
    4. View a specific document in the ncp_mysql.test.member index.
      • You can check the data added by MySQL under _source.after.
        cdss-5-10_en

    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.