Available in VPC
This guide explains how to install Kafka Connect on your server and then utilize the Cloud Data Streaming Service to apply MySQL changes to Elasticsearch.
This guide is an example written to aid users' understanding. The connector and confluent-hub used in this guide are licensed by Confluent and follow Confluent's licensing policy. NAVER Cloud is not a direct provider of the connector and confluent-hub, and users can decide whether or not to use the connector and confluent-hub.
Preliminary tasks
Before following this guide, complete these prerequisites:
- Create server and VPC.
- Create a Cloud DB for MySQL server.
- Create a Cloud Data Streaming Service cluster.
- Create a Search Engine Service cluster.
Configure network
Step 1. Configure Cloud DB for MySQL
To add a DB account of Cloud DB for MySQL:
- From the NAVER Cloud Platform console's VPC environment, navigate to
> Services > Database > Cloud DB for MySQL > DB Server. - Select a DB Server, then click Manage DB > Manage DB users.
- Enter the required information and click [Add DB user].
- Once the DB account has been added, click [Save].

For more information on Cloud DB for MySQL, see the Cloud DB for MySQL user guide.
Step 2. Configure ACG
To configure the ACG to allow access to port 9092 of the Cloud Data Streaming Service broker node:
- In the VPC environment on the NAVER Cloud Platform console, navigate to
> Services > Compute > Server > ACG. - Select "cdss-b-xxxxx" from the ACG list and click [Set ACG].
- Enter ACG rules, and then click [Add].
- Protocol: TCP
- Access source: IP of the server where Kafka Connect will run
- Allowed port: 9092
- Click [Apply].
To configure ACG to allow access to port 9200 of the Search Engine Service manager node:
- In the VPC environment on the NAVER Cloud Platform console, navigate to
> Services > Compute > Server > ACG. - Select "searchengine-m-xxxxx" from the ACG list and click [Set ACG].
- Enter ACG rules, and then click [Add].
- Protocol: TCP
- Access source: IP of the server where Kafka Connect will run
- Allowed port: 9200
Install Kafka Connect on a server
Step 1. Install Java
To install Java:
-
Perform a yum update by entering the following command:
yum update -
Install java-1.8.0-openjdk-devel.x86_64 by entering the following command:
yum install java-1.8.0-openjdk-devel.x86_64 -
Enter the following command to check if the installation was successful:
java -version javac -version
Step 2. Install Kafka Connect
To install Kafka Connect:
-
Download Kafka Connect to the
/rootpath of your server by entering the following command:curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz -
Unzip the downloaded file by entering the following command:
tar -zxvf confluent-community-7.0.1.tar.gz -
Edit the
propertiesfile before running by entering the following command.- Add the IP list to "bootstrap.servers" in the properties file by referring to the broker node information of Cloud Data Streaming Service.
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesbootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
Step 3. Install Confluent Hub
Confluent Hub is a repository where you can easily download various plugins used in Kafka Connect. For a complete list of supported plugins, see the Confluent Connector Portfolio provided by Confluent.
-
Create a new folder in the
/rootpath by entering the following command, and then navigate to that folder.- In this example, we create a folder named "confluent-hub".
mkdir confluent-hub cd confluent-hub -
Download Confluent Hub to the current path (
/root/confluent-hub) by entering the following command:curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz -
Unzip the downloaded file by entering the following command:
tar -zxvf confluent-hub-client-latest.tar.gz -
Enter the following command to create a folder in the current path (
/root/confluent-hub) to which future plugins will be saved:- In this example, we create a folder named "plugins".
mkdir plugins -
Add a path to the unzipped bin folder to the PATH environment variable by entering the following commands in order:
vi ~/.bashrcexport CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/binsource ~/.bashrc
Step 4. Install MySQL Connector
Install debezium-connector-mysql by entering the following command:
--component-diris the folder path where the actual plugin is installed. Set it to the/root/confluent-hub/pluginscreated in Step 3.--worker-configsis the path to the properties file applied after plugin installation. Set it to the/root/confluent-7.0.1/etc/kafka/connect-distributed.propertiesyou edited in Step 2.
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 5. Install Elasticsearch Connector
Install kafka-connect-elasticsearch by entering the following command:
- Apply
--component-dirand--worker-configsin the same way as in Step 4.
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 6. Run Kafka Connect process
To run the Kafka Connect process:
-
Run the Kafka Connect process in the background by entering the following command:
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties -
Enter the following command to check that the process is working properly:
curl localhost:8083{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"} -
Enter the following command to check that all previously installed connectors are being properly displayed:
curl localhost:8083/connector-plugins[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Use Kafka Connect
Step 1. Create MySQL table and add data
To create a MySQL table and add data:
- Access the Cloud DB for MySQL server you are using.
- Create a table by entering the following command, and then add data to it.
- In this example, we'll create a table named member.
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
Step 2. Register MySQL Connector
To register a MySQL Connector:
- Access the server where Kafka Connect is installed.
- When requesting, enter the JSON body to be sent in the format below according to your personal environment.
{ "name": "mysql-connector", // Connector name "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", // Type of plugin to use "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL server endpoint "database.port": "3306", // MySQL server port "database.user": "kimdong", // MySQL server user "database.password": "1234", // MySQL server user's password "database.server.id": "184054", // UUID of MySQL server used in Kafka Connect "database.server.name": "NCP_MYSQL", // The name given to the MySQL server to be used in Kafka Connect, which will later be used as the prefix for Kafka topics "database.whitelist": "test", // Specifies the database of the MySQL server to be accessed in Kafka Connect "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker node information "database.history.kafka.topic": "this_is_topic", // Topic name to store MySQL history changes "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } } - Register the MySQL Connector by entering the following command:
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
Step 3. Check topic
To check that a topic has been created:
- Access CMAK, provided by Cloud Data Streaming Service.
- Check that the topic has been created according to the body information set in Step 2.
- The topic that will contain the change information for the MySQL member table created in Step 1 is "NCP_MYSQL.test.member".

- The topic that will contain the change information for the MySQL member table created in Step 1 is "NCP_MYSQL.test.member".
Step 4. Register Elasticsearch Connector
To register the Elasticsearch Connector:
- Access the server where Kafka Connect is installed.
- When requesting, enter the JSON body to be sent in the format below according to your personal environment.
{ "name": "es-connector", // Connector name "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // Type of plugin to use "connection.url": "http://10.0.100.9:9200", // Kafka broker node to retrieve the topic "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // Topic name to consume "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // PK name used in MySQL table "behavior.on.null.values": "IGNORE" } } - Register the Elasticsearch Connector by entering the following command:
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
Step 5. Check Elasticsearch data
To check the Elasticsearch data:
- Access the Kibana endpoint provided by Search Engine Service.
- Check the index list to confirm that an index named ncp_mysql.test.member has been created.

- View the contents of the ncp_mysql.test.member index.

- View a specific document in the ncp_mysql.test.member index.
- Data added from MySQL can be checked in
_source.after.

- Data added from MySQL can be checked in