- Print
- PDF
Cloud Data Streaming Service utilization
- Print
- PDF
Available in VPC
This guide describes how to access CMAK to create and save data to topics, how to send data save in these topics to Cloud Data Streaming Service clusters, and how to encrypt the communication section between Apache Kafka client and Apache Kafka's broker nodes.
Access CMAK
CMAK is a platform that manages Apache Kafka clusters. You can access CMAK to create topics, change the number of partitions in topics, or change the data retention period of topics.
Before accessing CMAK, you need to activate the public domain.
The following describes how to access CMAK.
- From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
- Mark the checkbox of the cluster, and then click the Manage cluster > Change CMAK access domain settings button.
- When a pop-up window appears, check the information and click the [OK] button.
- The public domain is activated.
- Mark the checkbox of the cluster, and then click Manage cluster > Access CMAK.
- If the Access CMAK menu is not activated, then it means that the public domain is not activated. Proceed with the Steps 1 to 3 again.
- When the preparation window for accessing CMAK appears, click the Go to CMAK button.
- When the login pop-up window appears, enter the user ID and password configured when creating the cluster.
- If you've forgotten the ID, then you can check the ID by clicking Manage cluster > Reset CMAK access password.
- Click the cluster account name.
- Check the information about topics and broker nodes from Cluster summary.
Information about topics
Information about broker nodes
Create topic
- Access the CMAK page.
- From the CMAK page, click the Topic > Create menu.
- Enter the topic information.
- Topic: Enter the name of the topic.
- Partitions: Enter the number of partitions for the topic.
- Replication factor: Enter the number of replication factors for the topic.
- Click the [Create] button.
For descriptions on detailed configurations, refer to Apache Kafka Documentation.
- After creating the topic, it's only possible to increase the number of partitions. It's not possible to decrease the number of partitions that are already added.
- If the order guarantee is necessary when consuming incoming data, then the number of partitions must be set as 1. However, if the number of partitions is 1, then you need to be careful about the broker node's disk usage since all data is stored in one broker node.
- The number of replications can't be changed after creating the topic. It is recommended that you apply at least 2 or more replications for stable service operation.
View topic information
① Topic summary: Check the information of the topic.
② Operations: Delete topics, add partitions, redistribute assignments, and change the settings.
③ Partitions by broker: Check the information of partitions created for each broker node.
④ Partition information: Check which broker nodes the leader and replications of each partition are located on.
Change topic's data retention period
The following describes how to adjust the data retention period of a topic.
- Access CMAK by referring to Access CMAK.
- Click the cluster name.
- Click Topic > List.
- From the list of topics, click the name of the topic you want to change the data retention period for.
- Click the Operations > [Update config] button.
- Modify the retention.ms value.
- If you wish to set it as 1 hour, then enter the value of 3600000.
- If you wish to set it as n hours, then enter the resulting value of 3600000 * n.
- Click the [Update config] button.
Change topic's number of partitions
You can increase the number of partitions for topics created through CMAK. Partitions can be added only, and partitions already assigned can't be deleted.
The following describes how to increase the number of partitions for a topic.
- Access CMAK by referring to Access CMAK.
- Click the cluster name.
- Click Topic > List.
- From the list of topics, click the name of the topic you want to change the data retention period for.
- Click the Operations > [Add partitions] button.
- Enter the number of partition in the Partitions field.
- You can only enter a number greater than the entered number.
- You can only enter a number greater than the entered number.
- Click the [Add partitions] button.
Implement Producer/Consumer
This guide describes how to create Producer VM and Consumer VM with the following structure to save and send data to the topic.
It describes how to send and save data using Apache Kafka, Java, and Python.
This guide is based on the CentOS 7.3 version.
Create Producer VM and Consumer VM
The following describes how to create Producer VM and Consumer VM.
- Refer to Getting started with Server and create a server.
- When creating the server, select the same VPC as the VPC where you created the Cloud Data Streaming Service cluster on.
- For Subnet, select a public subnet.
- A public IP must assigned to the VM to access it.
Install Java
Enter the following command to install Java.
yum install java -y
Install Apache Kafka
Enter the following command to install Apache Kafka.
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# Decompress it.
tar -zxvf kafka_2.12-2.4.0.tgz
View Broker node information
The following describes how to check the broker node information.
- From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
- From Cluster details, click the [View details] button of Broker node information.
- When the Broker node information window appears, check the information.
- PlainText: the information for communicating with the broker node without encryption
- TLS: the information for communicating with the broker node with encryption
- hosts file information: the information required when modifying the hosts file, which is used for encrypted communication
Set ACG of broker node
The following describes how to set ACG rules for the broker node.
- From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
- From Cluster details, click of Broker node ACG.
- Select the ACG of the broker node from the ACG list, and then click the [Set] button.
- Enter the ACG rule, and then click the [Add] button.
- Protocol: TCP
- Access source: Enter the private IP of Producer VM and Consumer VM.
- Allowed port: Enter 9092 - 9093.
- Enter notes for the ACG.
- After checking if the rule has been added, click the [Apply] button.
- The rule is applied to the ACG.
Send data using Apache Kafka
Access the Producer VM created in the previous step, and then enter the following command.
cd kafka_2.12-2.4.0 ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] # Enter the PlainText copy of the broker node information checked earlier in [broker.list]. # In [topic], enter the topic created in CMAK. # Example: ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092 ,192.168.2.25:9092, 192.168.2.26:9092 --topic test
Enter the message you want to send.
- The message is saved to the broker node.
- If you want to exit, then press the [Ctrl + C] key.
Access the Consumer VM created in the previous step, and then enter the following command.
- Using the --from-beginning command will search all the data on the topic from the beginning.
- If the --from-beginning command isn't used, then only the data entered from the moment of the data search is searched.
cd kafka_2.12-2.4.0 ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning # Enter the PlainText copy of the broker node information checked earlier in [bootstrap.server]. # In [topic], enter the topic entered in Producer VM in the previous step. # Example: ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092 --topic test --from-beginning
Send data using Java
It describes how to send data using Java.
This guide is based on using IntelliJ IDEA.
Create project
The following describes how to create a project.
- Run IntelliJ IDEA, and then click File > New > Project.
- Select Maven Archetype, enter the project information, and then click the [Create] button to create the project.
Modify pom.xml file
Modify the pom.xml
file, which defines the project dependency, Java version, and the packaging method.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.example</groupId>
<artifactId>maventest</artifactId>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- Apache Kafka version in Cloud Data Streaming Service -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>KafkaMain</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
The pom.xml file may differ, depending on the user environment.
Create KafkaMain.java
When running the Java application, it delivers the produce/consume status, topic list, and broker list as arguments.
public class KafkaMain {
public static void main(String[] args) throws IOException {
String topicName = args[1];
String brokers = args[2];
switch(args[0]){
case "produce":
Producer.produce(brokers, topicName);
break;
case "consume":
Consumer.consume(brokers, topicName);
break;
default:
System.out.println("Wrong arguments");
break;
}
System.exit(0);
}
}
Create Producer.java
Create the Producer.java
file. The following describes an example of sending numbers between 0 and 99.
public class Producer {
public static void produce(String brokers, String topicName) throws IOException {
// Create Producer
KafkaProducer<String, String> producer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
for(int i=0;i<100;i++){
ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
producer.send(record);
}
producer.close();
}
}
Create Consumer.java
Create the Consumer.java
file.
public class Consumer {
public static int consume(String brokers, String topicName) {
// Create Consumer
KafkaConsumer<String, String> consumer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
System.out.println(record.value());
}
}
}
}
Build and run jar file
- Save the created Java code to git, and then enter the git clone command from the VM to download the code.
git clone your git repository
- Install Maven to build the Java application.
yum install maven -y
- Go to the folder where the downloaded Java code is located, and then build the jar file.
Once you build the jar file, the jar file is created in the target folder.cd kafkatest mvn clean package
- Go to the target folder, and then run the jar file.
cd target # Send data java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list] # In [topic], enter the topic created in CMAK. # Enter the PlainText copy of the broker node information checked earlier in [broker.list]. Example: java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092 # Search data java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list] # In [topic], enter the topic created in CMAK. # Enter the PlainText copy of the broker node information checked earlier in [broker.list]. Example: java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
Send data using Python
This guide is based on the Python 2.7.5 version.
To use Kafka in Python. install kafka-python package
.
# Install pip
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
# Install kafka-python package
pip install kafka-python
Create KafkaMain.py
Create the KafkaMain.py
file.
import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time
def produce(topicName, brokerLists):
producer = KafkaProducer(bootstrap_servers=brokerLists,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(100):
producer.send(topicName, i)
def consume(topicName, brokerLists):
consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
group_id="test")
for msg in consumer:
print(msg)
action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')
if action == 'produce':
produce(topicName, brokerLists)
elif action == 'consume':
consume(topicName, brokerLists)
else:
print('wrong arguments')
Run KafkaMain.py to perform produce
Run the KafkaMain.py
file. The following is an example of sending numbers between 0 and 99.
python KafkaMain.py produce [topic] [broker.list]
# In [topic], enter the topic created in CMAK.
# Enter the PlainText copy of the broker node information checked earlier in [broker.list].
# Example: python KafkaMain.py produce test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
Run KafkaMain.py file to perform consume
Run the KafkaMain.py
file.
python KafkaMain.py consume [topic] [broker.list]
# In [topic], enter the topic created in CMAK.
# Enter the PlainText copy of the broker node information checked earlier in [broker.list].
# Example: python KafkaMain.py consume test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
Encrypt communication section
It describes how to encrypt the communication section between the Apache Kafka client and Apache Kafka's broker nodes.
The overall process is as follows. Steps 1 to 3 are automatically executed when creating a cluster.
- Create a self-signed certificate from the manager node.
- Create a certificate from each broker node, and then create a certificate signature request.
- The manager node signs the certificate signature request.
- In the client, download the certificate, and then create TrustStore which has the certificate information.
- Create a configuration file for the encrypted communication.
- Modify the hosts file.
Download certificate
The following describes how to download the certificate.
- From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
- From Cluster details, click the [Download] button of the Manage certificate field.
- When a pop-up window appears, click the [OK] button.
- The certificate is downloaded.
- Copy the downloaded certificate file to producer and Consumer VMs.
- Save the file as ca-cert in the
/root
path of the Producer and Consumer VMs.
- Save the file as ca-cert in the
Create Truststore
The following describes how to create TrustStore that stores the certificate information.
- Enter the following command to create Truststore.
keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
- Enter the keystore password.
- Enter keystore password: Enter the password.
- Re-enter new password: Enter the password again.
- Trust this certificate? [no]: When this text appears, enter "yes."
- Enter the Is command to check if the
kafka.client.truststore.jks
file has been created.
Create encryption configuration file
Create a client-auth.properties
file as follows.
In [password], enter the password set when creating the kafka.client.truststore.jks
file.
# Create the encryption configuration file in the /root/kafka_2.12-2.4.0 folder.
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]
Modify hosts file
The host file converts the host name to IP before the DNS server in Linux. The hosts file (/etc/hosts) needs to be modified for encrypted communication.
Run the /etc/hosts
file.
vi /etc/hosts
Add the copy of the hosts file information from Broker node information.
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao
Send data by encrypting communication section
Run the following command from Producer VM.
When you enter the message you want to send, the message is stored in the broker node. If you want to exit, then press the [Ctrl] + [C] key.
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# Enter the TLS copy of the broker node information checked earlier in [broker.list].
# In [topic], enter the topic created in CMAK.
# Example: ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093, yeakafka2-d-2an:9093, yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
Search saved data by encrypting communication section
Run the following command from Consumer VM.
cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# Enter the TLS copy of the broker node information checked earlier in [bootstrap.server].
# In [topic], enter the topic entered in Producer VM in the previous step.
# Example: ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093, yeakafka2-d-2an:9093, yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
Build data pipeline with Kafka Connect
For information on how to build data pipelines with Kafka Connect, refer to Build data pipeline with Kafka Connect.