Building data pipeline with Kafka Connect
  • PDF

Building data pipeline with Kafka Connect

  • PDF

This guide describes how to install Kafka Connect to utilize Cloud Data Streaming Service.

Note

This is a sample guide produced for your convenience. Confluent owns the license for the connector and Confluent-Hub used in this guide, and it follows Confluent's license policy. NAVER Cloud does not directly provide the connector and Confluent-Hub, and users can choose whether or not to use them at their own discretion.

What is Kafka Connect?

Kafka Connect is a component that connects external systems such as a database, key-value store, and file system to Kafka. It is open-source software provided by Apache Kafka. Kafka Connect enables you to connect different products to the system, and build your data pipeline with ease.

Before use

This guide describes how to install Kafka Connect on a server, and then apply the changes in MySQL to Elasticsearch.
The necessary services need to be activated before starting.

Prerequisite services

  • Server: a pay as you go service that can be quickly set in the cloud environment and does not require users to separately purchase physical server resources
  • Cloud DB for MySQL: a fully managed cloud database service that automatically restores data in case of a failure
  • Cloud Data Streaming Service: a service that helps you easily deploy, operate, and expand open-source Apache Kafka clusters
  • Search Engine Service: a service that helps you easily deploy, operate, and expand Elasticsearch clusters

Set network

Step 1. Set Cloud DB for MySQL

  1. From Products & Services > Database > Cloud DB for MySQL, click the [Manage DB] button.

  2. Click Manage DB user.
    cdss-5-1_en

  3. Enter the necessary information, and then click the [Add DB user] button.

    cdss-5-2_en

Step 2. Set ACG

  1. From the Products & Services > Compute > Server > ACG menu, select the "cdss-b-xxxxx" that you have.

    • Set ACG so that access to Port 9092 of the Cloud Data Streaming Service broker node is allowed.

    cdss-5-3_en

  2. Click the [Set ACG] button, and then add the following ACG rule.

    • Protocol: TCP
    • Access source: IP of the server where Kafka Connect will run
    • Allowed port: 9092

    cdss-5-4_en

  3. Select the "searchengine-m-xxxxx" you have.

    • Set ACG so that access to Port 9200 of the Search Engine Service manager node is allowed.

    cdss-5-5_en

  4. Click the [Set ACG] button, and then add the following ACG rule.

    • Protocol: TCP
    • Access source: IP of the server where Kafka Connect will run
    • Allowed port: 9200

    cdss-5-6_en

Install Kafka Connect on server

Step 1. Install Java

  1. Enter the following command to update yum.

    yum update
    
  2. Enter the following command to install java-1.8.0-openjdk-devel.x86_64.

    yum install java-1.8.0-openjdk-devel.x86_64
    
  3. Enter the following command to check if the installation has been completed successfully.

    java -version
    javac -version
    

Step 2. Install Kafka Connect

  1. Enter the following command to download Kafka Connect to the /root path of the server.

    curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
    
  2. Enter the following command to decompress the downloaded file.

    tar -zxvf confluent-community-7.0.1.tar.gz
    
  3. Enter the following command to edit the properties file before starting.

    • Add the IP list to "bootstrap.servers" of the properties file by referring to the broker node information of Cloud Data Streaming Service.
    vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
    bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
    

Step 3. Install Confluent Hub

Confluent Hub is a storage where you can easily download different plugins used in Kafka Connect. Please see Confluent Connector Portfolio provided by Confluent for the complete list of supported plugins.

  1. Enter the following command to create a new folder in the /root path, and then go to that folder.

    • For this example, we will create a folder called "confluent-hub."
    mkdir confluent-hub
    cd confluent-hub
    
  2. Enter the following command to download Confluent Hub to the current path (/root/confluent-hub).

    curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
    
  3. Enter the following command to decompress the downloaded file.

    tar -zxvf confluent-hub-client-latest.tar.gz
    
  4. Enter the following command to create a folder where the plugins will be saved later under the current path (/root/confluent-hub).

    • For this example, we will create a folder called "plugins."
    mkdir plugins
    
  5. Enter the following commands in sequence to add the path of the decompressed bin folder to environment variable PATH.

    vi ~/.bashrc
    
    export CONFLUENT_HOME='~/confluent-hub'
    export PATH=$PATH:$CONFLUENT_HOME/bin
    
    source ~/.bashrc
    

Step 4. Install MySQL connector

  1. Enter the following command to install debezium-connector-mysql.
    • --component-dir is the folder path where the actual plugin will be installed. Set it to /root/confluent-hub/plugins, as created in Step 3.
    • --worker-configs is the path of the properties file after the plugin is installed and applied. Set it to /root/confluent-7.0.1/etc/kafka/connect-distributed.properties, as edited in Step 2.
    confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

Step 5. Install Elasticsearch connector

  1. Enter the following command to install kafka-connect-elasticsearch.
    • Apply the same instructions to --component-dir and --worker-configs as in Step 4.
    confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

Step 6. Run Kafka Connect process

  1. Enter the following command to run the Kafka Connect process in the background.

    /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
  2. Enter the following command to check if the process is working normally.

    curl localhost:8083
    
    {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
    
  3. Enter the following command to check if previously installed connectors all show up normally.

    curl localhost:8083/connector-plugins
    
    [
       {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
       {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
       {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
    ]
    

Use Kafka Connect

Step 1. Create MySQL table and add data

  1. Access a Cloud DB for MySQL server in use.

  2. Enter the following command to create a table, and then add data.

    • For this example, we will create a table called member.
    CREATE TABLE IF NOT EXISTS member (
     id int NOT NULL PRIMARY KEY,
     name varchar(100),
     email varchar(200),
     department varchar(200)
    );
    
    INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
    INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
    INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
    INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
    

Step 2. Register MySQL connector

  1. Access the server where Kafka Connect is installed.

  2. Enter the JSON body to be returned when requested in the format below. Adjust it to the individual environment.

    {
       "name": "mysql-connector", // Connector's name
       "config": {
         "connector.class": "io.debezium.connector.mysql.MySqlConnector", // Type of the plugin to use
         "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL server's endpoint
         "database.port": "3306", // MySQL server's port
         "database.user": "kimdong", // MySQL server's user
         "database.password": "1234", // MySQL server user's password
         "database.server.id": "184054", // MySQL server's UUID used by Kafka Connect
         "database.server.name": "NCP_MYSQL", // Name given to the MySQL server to be used by Kafka Connect, which will be used as a prefix for Kafka topics 
         "database.whitelist": "test", // Specifies the MySQL server's database to be accessed from Kafka Connect
         "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker node information
         "database.history.kafka.topic": "this_is_topic", // Name of the topic to save the changes of MySQL history 
         "snapshot.locking.mode": "none",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter"
       }
    }
    
  3. Enter the following command to register the MySQL connector.

    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
    
    

Step 3. View topic

  1. Access the CMAK endpoint provided by Cloud Data Streaming Service.

  2. Check if the topic has been created according to the body information configured in Step 2.

    • The topic where the changes in the MySQL member table created in Step 1 will be saved is "NCP_MYSQL.test.member."

    cdss-5-7_en

Step 4. Register Elasticsearch connector

  1. Access the server where Kafka Connect is installed.

  2. Enter the JSON body to be returned when requested in the format below. Adjust it to the individual environment.

    {
       "name": "es-connector", // Connector's name
       "config": {
         "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // Type of the plugin to use
         "connection.url": "http://10.0.100.9:9200", // Kafka broker node to import the topic
         "tasks.max": "1",
         "topics": "NCP_MYSQL.test.member", // Name of the topic to consume
         "type.name": "_doc",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter.schemas.enable": "true",
         "value.converter.schemas.enable": "true",
         "transforms": "extractKey",
         "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
         "transforms.extractKey.field": "id", // Name of the primary key used in the MySQL table
         "behavior.on.null.values": "IGNORE"
       }
    }
    
  3. Enter the following command to register the Elasticsearch connector.

    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
    

Step 5. View Elasticsearch data

  1. Access the Kibana endpoint provided by Search Engine Service.

  2. Search the index list, and check if an index called ncp_mysql.test.member has been created.
    cdss-5-8_en

  3. View the content of the ncp_mysql.test.member index.
    cdss-5-9_en

  4. View a specific document in the ncp_mysql.test.member index.

    • You can check the data added by MySQL under _source.after.

    cdss-5-10_en


Was this article helpful?