- Print
- PDF
Build data pipeline with Kafka Connect
- Print
- PDF
Available in VPC
This guide describes how to install Kafka Connect on a server, and then apply the changes in MySQL to Elasticsearch using Cloud Data Streaming Service.
This guide is an example intended to help improve the user's understanding. Confluent owns the license for the connector and Confluent-Hub used in this guide, and it follows Confluent's license policy. NAVER Cloud does not directly provide the connector and Confluent-Hub, and users can choose whether to use them at their own discretion.
Preparations
Before following this guide, you need to complete the subscription for the following jobs:
- Create server and VPC
- Create Cloud DB for MySQL server
- Create Cloud Data Streaming Service cluster
- Create Search Engine Service cluster
Set network
Step 1. Set Cloud DB for MySQL
The following describes how to create a DB account for Cloud DB for MySQL.
- From the NAVER Cloud Platform console, click the Services > Database > Cloud DB for MySQL > DB Server menus, in that order.
- Select a DB server, and then click Manage DB > Manage DB user.
- Enter the necessary information, and then click the [Add DB user] button.
- Click the [Save] button once the DB account is added.
Please refer to the Cloud DB for MySQL Guide for more information about Cloud DB for MySQL.
Step 2. Set ACG
The following describes how to set ACG so that access to Port 9092 of the Cloud Data Streaming Service broker node is allowed.
- From the NAVER Cloud Platform console, click the Services > Compute > Server > ACG menus, in that order.
- Select "cdss-b-xxxxx" from the ACG list, and then click the [Set ACG] button.
- Enter the ACG rule, and then click the [Add] button.
- Protocol: TCP
- Access source: IP of the server where Kafka Connect will run
- Allowed port: 9092
- Click the [Apply] button.
The following describes how to set ACG so that access to Port 9200 of the Search Engine Service manager node is allowed.
- From the NAVER Cloud Platform console, click the Services > Compute > Server > ACG menus, in that order.
- Select "searchengine-m-xxxxx" from the ACG list, and then click the [Set ACG] button.
- Enter the ACG rule, and then click the [Add] button.
- Protocol: TCP
- Access source: IP of the server where Kafka Connect will run
- Allowed port: 9200
Install Kafka Connect on server
Step 1. Install Java
The following describes how to install Java.
Enter the following command to update yum.
yum update
Enter the following command to install java-1.8.0-openjdk-devel.x86_64.
yum install java-1.8.0-openjdk-devel.x86_64
Enter the following command to check if the installation has been completed successfully.
java -version javac -version
Step 2. Install Kafka Connect
The following describes how to install Kafka Connect.
Enter the following command to download Kafka Connect to the
/root
path of the server.curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
Enter the following command to decompress the downloaded file.
tar -zxvf confluent-community-7.0.1.tar.gz
Enter the following command to modify the
properties
file before starting.- Add the IP list to "bootstrap.servers" of the properties file by referring to the broker node information of Cloud Data Streaming Service.
vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
Step 3. Install Confluent Hub
Confluent Hub is a storage where you can easily download different plugins used in Kafka Connect. Please check Confluent Connector Portfolio provided by Confluent for the complete list of supported plugins.
Enter the following command to create a new folder in the
/root
path, and then go to that folder.- For this example, we will create a folder named "confluent-hub."
mkdir confluent-hub cd confluent-hub
Enter the following command to download Confluent Hub to the current path (
/root/confluent-hub
).curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
Enter the following command to decompress the downloaded file.
tar -zxvf confluent-hub-client-latest.tar.gz
Enter the following command to create a folder where the plugins will be saved later under the current path (
/root/confluent-hub
).- For this example, we will create a folder named "plugins."
mkdir plugins
Enter the following commands in sequence to add the path of the decompressed bin folder to the environment variable PATH.
vi ~/.bashrc
export CONFLUENT_HOME='~/confluent-hub' export PATH=$PATH:$CONFLUENT_HOME/bin
source ~/.bashrc
Step 4. Install MySQL connector
Enter the following command to install debezium-connector-mysql.
--component-dir
is the folder path where the actual plugin will be installed. Set it to/root/confluent-hub/plugins
, as created in Step 3.--worker-configs
is the path of the properties file after the plugin is installed and applied. Set it to/root/confluent-7.0.1/etc/kafka/connect-distributed.properties
, as modified in Step 2.
confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 5. Install Elasticsearch connector
Enter the following command to install kafka-connect-elasticsearch.
- Apply the same instructions to
--component-dir
and--worker-configs
as in Step 4.
confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Step 6. Run Kafka Connect process
The following describes how to run the Kafka Connect process.
Enter the following command to run the Kafka Connect process in the background.
/root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
Enter the following command to check if the process is working normally.
curl localhost:8083
{"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
Enter the following command to check if the connectors installed earlier all show up normally.
curl localhost:8083/connector-plugins
[ {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"}, {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"}, {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"} ]
Use Kafka Connect
Step 1. Create MySQL table and add data
The following describes how to create a MySQL table and add data.
- Access the Cloud DB for MySQL server in use.
- Enter the following command to create a table, and then add data.
- For this example, we will create a table named member.
CREATE TABLE IF NOT EXISTS member ( id int NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200) ); INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A'); INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B'); INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B'); INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
Step 2. Register MySQL connector
The following describes how to register the MySQL connector.
- Access the server where Kafka Connect is installed.
- Enter the JSON body to be sent when requesting in the following format. Adjust it to the personal environment.
{ "name": "mysql-connector", // Connector's name "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", // Type of the plugin to use "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL server's endpoint "database.port": "3306", // MySQL server's port "database.user": "kimdong", // MySQL server's user "database.password": "1234", // MySQL server user's password "database.server.id": "184054", // MySQL server's UUID used by Kafka Connect "database.server.name": "NCP_MYSQL", // Name given to the MySQL server to be used by Kafka Connect, which will be used as a prefix for Kafka topics "database.whitelist": "test", // Specify the MySQL server's database to be accessed from Kafka Connect "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker node information "database.history.kafka.topic": "this_is_topic", // Name of the topic to save the MySQL change history "snapshot.locking.mode": "none", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter" } }
- Enter the following command to register the MySQL connector.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}'
Step 3. View topic
The following describes how to check if the topic has been created.
- Access CMAK provided by Cloud Data Streaming Service.
- Check if the topic has been created according to the body information configured in Step 2.
- The topic where the changes in the MySQL member table created in Step 1 will be saved is "NCP_MYSQL.test.member."
- The topic where the changes in the MySQL member table created in Step 1 will be saved is "NCP_MYSQL.test.member."
Step 4. Register Elasticsearch connector
The following describes how to register the Elasticsearch connector.
- Access the server where Kafka Connect is installed.
- Enter the JSON body to be sent when requesting in the following format. Adjust it to the personal environment.
{ "name": "es-connector", // Connector's name "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // Type of the plugin to use "connection.url": "http://10.0.100.9:9200", // Kafka broker node to import the topic "tasks.max": "1", "topics": "NCP_MYSQL.test.member", // Name of the topic to consume "type.name": "_doc", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "true", "transforms": "extractKey", "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKey.field": "id", // Name of the primary key used by MySQL table "behavior.on.null.values": "IGNORE" } }
- Enter the following command to register the Elasticsearch connector.
curl -X POST localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
Step 5. View Elasticsearch data
The following describes how to check the Elasticsearch data.
- Access the Kibana's endpoint provided by Search Engine Service.
- Search the index list, and check if an index called ncp_mysql.test.member has been created.
- View the content of the ncp_mysql.test.member index.
- View a specific document in the ncp_mysql.test.member index.
- You can check the data added by MySQL under
_source.after
.
- You can check the data added by MySQL under