Establish data pipeline with Kafka Connect

Prev Next

Available in VPC

This guide explains how to install Kafka Connect on your server and then utilize the Cloud Data Streaming Service to apply MySQL changes to Elasticsearch.

Note

This guide is an example written to aid users' understanding. The connector and confluent-hub used in this guide are licensed by Confluent and follow Confluent's licensing policy. NAVER Cloud is not a direct provider of the connector and confluent-hub, and users can decide whether or not to use the connector and confluent-hub.

Preliminary tasks

Before following this guide, complete these prerequisites:

  • Create server and VPC.
  • Create a Cloud DB for MySQL server.
  • Create a Cloud Data Streaming Service cluster.
  • Create a Search Engine Service cluster.

Configure network

Step 1. Configure Cloud DB for MySQL

To add a DB account of Cloud DB for MySQL:

  1. From the NAVER Cloud Platform console's VPC environment, navigate to i_menu > Services > Database > Cloud DB for MySQL > DB Server.
  2. Select a DB Server, then click Manage DB > Manage DB users.
  3. Enter the required information and click [Add DB user].
  4. Once the DB account has been added, click [Save].
    cdss-5-2_ko
Note

For more information on Cloud DB for MySQL, see the Cloud DB for MySQL user guide.

Step 2. Configure ACG

To configure the ACG to allow access to port 9092 of the Cloud Data Streaming Service broker node:

  1. In the VPC environment on the NAVER Cloud Platform console, navigate to i_menu > Services > Compute > Server > ACG.
  2. Select "cdss-b-xxxxx" from the ACG list and click [Set ACG].
  3. Enter ACG rules, and then click [Add].
    cdss-5-4_ko
    • Protocol: TCP
    • Access source: IP of the server where Kafka Connect will run
    • Allowed port: 9092
  4. Click [Apply].

To configure ACG to allow access to port 9200 of the Search Engine Service manager node:

  1. In the VPC environment on the NAVER Cloud Platform console, navigate to i_menu > Services > Compute > Server > ACG.
  2. Select "searchengine-m-xxxxx" from the ACG list and click [Set ACG].
  3. Enter ACG rules, and then click [Add].
    cdss-5-6_ko
    • Protocol: TCP
    • Access source: IP of the server where Kafka Connect will run
    • Allowed port: 9200

Install Kafka Connect on a server

Step 1. Install Java

To install Java:

  1. Perform a yum update by entering the following command:

    yum update
    
  2. Install java-1.8.0-openjdk-devel.x86_64 by entering the following command:

    yum install java-1.8.0-openjdk-devel.x86_64
    
  3. Enter the following command to check if the installation was successful:

    java -version
    javac -version
    

Step 2. Install Kafka Connect

To install Kafka Connect:

  1. Download Kafka Connect to the /root path of your server by entering the following command:

    curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
    
  2. Unzip the downloaded file by entering the following command:

    tar -zxvf confluent-community-7.0.1.tar.gz
    
  3. Edit the properties file before running by entering the following command.

    • Add the IP list to "bootstrap.servers" in the properties file by referring to the broker node information of Cloud Data Streaming Service.
    vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
    bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
    

Step 3. Install Confluent Hub

Confluent Hub is a repository where you can easily download various plugins used in Kafka Connect. For a complete list of supported plugins, see the Confluent Connector Portfolio provided by Confluent.

  1. Create a new folder in the /root path by entering the following command, and then navigate to that folder.

    • In this example, we create a folder named "confluent-hub".
    mkdir confluent-hub
    cd confluent-hub
    
  2. Download Confluent Hub to the current path (/root/confluent-hub) by entering the following command:

    curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
    
  3. Unzip the downloaded file by entering the following command:

    tar -zxvf confluent-hub-client-latest.tar.gz
    
  4. Enter the following command to create a folder in the current path (/root/confluent-hub) to which future plugins will be saved:

    • In this example, we create a folder named "plugins".
    mkdir plugins
    
  5. Add a path to the unzipped bin folder to the PATH environment variable by entering the following commands in order:

    vi ~/.bashrc
    
    export CONFLUENT_HOME='~/confluent-hub'
    export PATH=$PATH:$CONFLUENT_HOME/bin
    
    source ~/.bashrc
    

Step 4. Install MySQL Connector

Install debezium-connector-mysql by entering the following command:

  • --component-dir is the folder path where the actual plugin is installed. Set it to the /root/confluent-hub/plugins created in Step 3.
  • --worker-configs is the path to the properties file applied after plugin installation. Set it to the /root/confluent-7.0.1/etc/kafka/connect-distributed.properties you edited in Step 2.
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties

Step 5. Install Elasticsearch Connector

Install kafka-connect-elasticsearch by entering the following command:

  • Apply --component-dir and --worker-configs in the same way as in Step 4.
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties

Step 6. Run Kafka Connect process

To run the Kafka Connect process:

  1. Run the Kafka Connect process in the background by entering the following command:

    /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    
  2. Enter the following command to check that the process is working properly:

    curl localhost:8083
    
    {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
    
  3. Enter the following command to check that all previously installed connectors are being properly displayed:

    curl localhost:8083/connector-plugins
    
    [
       {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
       {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
       {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
       {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
    ]
    

Use Kafka Connect

Step 1. Create MySQL table and add data

To create a MySQL table and add data:

  1. Access the Cloud DB for MySQL server you are using.
  2. Create a table by entering the following command, and then add data to it.
    • In this example, we'll create a table named member.
    CREATE TABLE IF NOT EXISTS member (
     id int NOT NULL PRIMARY KEY,
     name varchar(100),
     email varchar(200),
     department varchar(200)
    );
    
    INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
    INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
    INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
    INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
    

Step 2. Register MySQL Connector

To register a MySQL Connector:

  1. Access the server where Kafka Connect is installed.
  2. When requesting, enter the JSON body to be sent in the format below according to your personal environment.
    {
       "name": "mysql-connector", // Connector name
       "config": {
         "connector.class": "io.debezium.connector.mysql.MySqlConnector", // Type of plugin to use
         "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL server endpoint
         "database.port": "3306", // MySQL server port
         "database.user": "kimdong", // MySQL server user
         "database.password": "1234", // MySQL server user's password
         "database.server.id": "184054", // UUID of MySQL server used in Kafka Connect
         "database.server.name": "NCP_MYSQL", // The name given to the MySQL server to be used in Kafka Connect, which will later be used as the prefix for Kafka topics 
         "database.whitelist": "test", // Specifies the database of the MySQL server to be accessed in Kafka Connect
         "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker node information
         "database.history.kafka.topic": "this_is_topic", // Topic name to store MySQL history changes 
         "snapshot.locking.mode": "none",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter"
       }
    }
    
  3. Register the MySQL Connector by entering the following command:
    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
    
    
    

Step 3. Check topic

To check that a topic has been created:

  1. Access CMAK, provided by Cloud Data Streaming Service.
  2. Check that the topic has been created according to the body information set in Step 2.
    • The topic that will contain the change information for the MySQL member table created in Step 1 is "NCP_MYSQL.test.member".
      cdss-5-7_ko

Step 4. Register Elasticsearch Connector

To register the Elasticsearch Connector:

  1. Access the server where Kafka Connect is installed.
  2. When requesting, enter the JSON body to be sent in the format below according to your personal environment.
    {
       "name": "es-connector", // Connector name
       "config": {
         "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // Type of plugin to use
         "connection.url": "http://10.0.100.9:9200", // Kafka broker node to retrieve the topic
         "tasks.max": "1",
         "topics": "NCP_MYSQL.test.member", // Topic name to consume
         "type.name": "_doc",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter.schemas.enable": "true",
         "value.converter.schemas.enable": "true",
         "transforms": "extractKey",
         "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
         "transforms.extractKey.field": "id", // PK name used in MySQL table
         "behavior.on.null.values": "IGNORE"
       }
    }
    
  3. Register the Elasticsearch Connector by entering the following command:
    curl -X POST localhost:8083/connectors \
       -H "Content-Type: application/json" \
       -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
    

Step 5. Check Elasticsearch data

To check the Elasticsearch data:

  1. Access the Kibana endpoint provided by Search Engine Service.
  2. Check the index list to confirm that an index named ncp_mysql.test.member has been created.
    cdss-5-8_ko
  3. View the contents of the ncp_mysql.test.member index.
    cdss-5-9_ko
  4. View a specific document in the ncp_mysql.test.member index.
    • Data added from MySQL can be checked in _source.after.
      cdss-5-10_ko