Using Cloud Data Streaming Service

Prev Next

Available in VPC

This guide describes how to create a topic and save data to the topic by accessing CMAK, as well as how to send data stored in a topic to a Cloud Data Streaming Service Cluster and how to encrypt communication between Apache Kafka clients and Apache Kafka broker nodes.

Access CMAK

CMAK is a platform for managing Apache Kafka clusters. Through CMAK, you can create a topic, change the number of partitions in the topic, or change the data retention cycle for the topic.

Note

The public domain must be activated before accessing CMAK.

To access CMAK:

  1. From the NAVER Cloud Platform console's VPC environment, navigate to i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster.
  2. Click the checkbox for the cluster, and then click Manage cluster > Change CMAK access domain settings.
  3. When a popup appears, check the notification and click [OK].
    • The public domain will be activated.
  4. Click the checkbox for the cluster, and then click Manage cluster > Access CMAK.
    • If the Access CMAK menu is not activated, it means the public domain is not activated. Follow steps 1-3 again.
  5. When the preliminary task window to access CMAK appears, click Go to CMAK.
    cdss-cmak_cmak01_ko
  6. When the login popup appears, enter the user ID and password you set when creating the cluster.
    • If you have forgotten your ID, you can retrieve it by clicking Manage cluster > Reset CMAK access password.
  7. Click on the cluster account name.
    cdss-cmak_cmak02_ko
  8. View information about topics and broker nodes in the Cluster Summary.
    cdss-cmak_cmak03_ko
  • Information about topics
    cmak-04_ko

  • Information about broker nodes
    cmak-05_ko

Create topic

  1. Access the CMAK page.
  2. On the CMAK page, click Topic > Create.
    cdss-cmak_topicadd01_ko
  3. Enter topic information.
    cdss-cmak_topicadd02_ko
    • Topic: Enter the topic name.
    • Partitions: Enter the number of partitions in the topic.
    • Replication Factor: Enter the number of replications in the topic.
  4. Click [Create].
Note

See Apache Kafka Documentation for descriptions of the detailed configuration values.

Caution
  • Once a topic has been created, the number of partitions can only be increased. The number of partitions that have already been added cannot be reduced.
  • If a certain order is required when consuming data, the number of partitions must be set to 1. However, if there is only 1 partition, all data is stored on a single broker node, so care must be taken to manage the disk usage of the broker node.
  • Once a topic has been created, the number of replications cannot be changed. To ensure stable service operation, we recommend implementing at least 2 replications.

Check topic information

cdss-cmak_topicinfo

Component Description
Topic Summary Check topic information.
Operations Delete topics, add and redistribute partitions, and change settings.
Partitions by Broker Check the generated partition information for each broker node.
Partition Information Check which broker node the leader and replications of each partition are located on.

Change the data retention cycle for a topic

To adjust a topic's data retention cycle:

  1. See Access CMAK to access CMAK.
  2. Click on the cluster name.
  3. Click Topic > List.
    cdss-cmak_topiclist_ko
  4. In the topic list, click the name of the topic for which you want to change the data retention cycle.
  5. Click Operations > [Update Config].
    cdss-cmak_topicconfig01_ko
  6. Edit the retention.ms value.
    cdss-cmak_topicconfig02_ko
    • To set it to 1 hour, enter the value 3600000.
    • To set it to n hours, enter the result of 3600000 * n.
  7. Click [Update Config].

Change the number of partitions in a topic

You can increase the number of partitions in a topic created in CMAK. Partitions can only be added — existing allocated partitions cannot be deleted.

To increase the number of partitions in a topic:

  1. See Access CMAK to access CMAK.
  2. Click on the cluster name.
  3. Click Topic > List.
    cdss-cmak_topiclist_ko
  4. In the topic list, click the name of the topic for which you want to change the data retention cycle.
  5. Click Operations > [Add Partitions].
    cdss-cmak_parition01_ko
  6. Enter the number of partitions in the Partitions field.
    • You may only enter a number greater than the number already entered.
      cdss-cmak_parition02_ko
  7. Click [Add Partitions].

Implement producer/consumer

This section explains how to store and transfer data to a topic by creating a Producer VM and a Consumer VM with the following structures.

It also explains how to transfer and store data using Apache Kafka, Java, and Python.

cdss-2-23

Note

This guide is based on CentOS 7.3.

Create a Producer VM/Consumer VM

To create a Producer VM and a Consumer VM:

  1. Create a server by referring to Getting started with Server.
  2. When creating a server, select the same VPC in which you created the Cloud Data Streaming Service cluster.
    cdss-cmak_server_ko
  3. For the subnet, select Public Subnet.
    • You must assign a public IP to the VM in order to access it.

Install Java

Enter the following command to install Java:

yum install java -y

Install Apache Kafka

Enter the following command to install Apache Kafka:

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# Unzip the file.
tar -zxvf kafka_2.12-2.4.0.tgz

Check broker node information

To check broker node information:

  1. From the NAVER Cloud Platform console's VPC environment, navigate to i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster.
  2. From Cluster details, click [View details] under Broker node information.
    cdss-cmak_broker01_ko
  3. When the Broker node information window appears, view the information.
    cdss-cmak_broker02_ko
    • PlainText: Information for unencrypted communication with the broker node.
    • TLS: Information for encrypted communication with the broker node.
    • hosts file information: Information required when editing the hosts file used for encrypted communication.

Broker node's ACG settings

To configure ACG rules for a broker node:

  1. From the NAVER Cloud Platform console's VPC environment, navigate to i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster.
  2. From Cluster details, click cdss-cluster-popupicon under Broker node ACG.
  3. Select the broker node ACG from the ACG list and click [Settings].
  4. Enter ACG rules, and then click [Add].
    cdss-cmak_acg01_ko
    • Protocol: TCP
    • Access source: Enter the private IP address of the Producer VM and Consumer VM.
    • Allowed port: Enter 9092-9093.
    • Enter any notes about the ACG.
  5. After checking that the rule has been added, click [Apply].
    • The rule is applied to ACG.

Data transfer using Apache Kafka

  1. After connecting to the Producer VM created in the previous step, enter the following command:

    cd kafka_2.12-2.4.0
    ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
    # Enter a PlainText copy of the broker node information you checked earlier in [broker.list].
    # Enter the topic you created in CMAK in [topic].
    # Example: ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
    

    cdss-2-29_ko

  2. Enter the message you want to transmit.

    • The message will be saved to the broker node.
    • Press [Ctrl + C] to quit.
      cdss-2-30_ko
  3. After connecting to the Consumer VM created in the previous step, run the following command:

    • Using the --from-beginning command retrieves all data for the topic from the beginning.
    • If you do not use the --from-beginning command, only the data entered from the moment the data is retrieved is retrieved.
    cd kafka_2.12-2.4.0
    ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
    # Enter a PlainText copy of the broker node information you checked earlier in [bootstrap.server].
    # In [topic], enter the Topic you entered in the previous step Producer VM.
    # Example: ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
    

Transfer data using Java

This section explains how to transfer data using Java.

Note

This guide is based on using IntelliJ IDEA.

Create a project

To create a project:

  1. After launching IntelliJ IDEA, click File > New > Project.
  2. Select Maven Archetype, enter the project information, and click [Create] to create a project.
    java-1_ko

Edit pom.xml file

Edit the pom.xml file that defines project dependencies, Java version, and packaging method.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <groupId>org.example</groupId>
    <artifactId>maventest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <url>http://maven.apache.org</url>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <!-- Apache Kafka version in Cloud Data Streaming Service -->
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>KafkaMain</mainClass>
                        </transformer>
                    </transformers>
                </configuration>

                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Note

The pom.xml file may vary depending on your user environment.

Write KafkaMain.java

When running a Java application, the produce/consume option, topics, and broker lists are forwarded as arguments.

public class KafkaMain {
    public static void main(String[] args) throws IOException {
        String topicName = args[1];
        String brokers = args[2];

        switch(args[0]){
            case "produce":
                Producer.produce(brokers, topicName);
                break;
            case "consume":
                Consumer.consume(brokers, topicName);
                break;
            default:
                System.out.println("Wrong arguments");
                break;
        }
        System.exit(0);
    }
}

Write Producer.java

Write a Producer.java file. Below is an example transferring the numbers 0-99:

public class Producer {
    public static void produce(String brokers, String topicName) throws IOException {
        // Create Producer
        KafkaProducer<String, String> producer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(properties);

        for(int i=0;i<100;i++){
            ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
            producer.send(record);
        }
        producer.close();
    }
}

Write Consumer.java

Write a Consumer.java file.

public class Consumer {
    public static int consume(String brokers, String topicName) {
        // Create Consumer
        KafkaConsumer<String, String> consumer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "consumer_group");
        properties.setProperty("auto.offset.reset", "earliest");

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                System.out.println(record.value());
            }
        }
    }
}

Build and run jar file

  1. Save the Java code you wrote to git, and then enter the git clone command in the VM to download the code.
    git clone own git repository
    
  2. To build the Java application, install Maven.
    yum install maven -y
    
  3. Navigate to the folder containing the downloaded Java code, and then build the jar file.
    When you build a jar file, a target folder and a jar file inside the target folder are created.
    cd kafkatest
    mvn clean package
    
  4. Navigate to the target folder and run the jar file.
    cd target
    
    # Transfer data
    java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
    # Enter the topic created in CMAK in [topic].
    # Enter a PlainText copy of the broker node information you checked earlier in [broker.list].
    Example: java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    
    
    # View data
    java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
    # Enter the topic created in CMAK in [topic].
    # Enter a PlainText copy of the broker node information you checked earlier in [broker.list].
    Example: java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    

Use Python to transfer data

This guide is based on Python 2.7.5.

To use Kafka in Python, install the kafka-python package.

# Install pip
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py

# Install kafka-python package
pip install kafka-python

Write KafkaMain.py

Write a KafkaMain.py file.

import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time


def produce(topicName, brokerLists):
        producer = KafkaProducer(bootstrap_servers=brokerLists,
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))
        for i in range(100):
                producer.send(topicName, i)

def consume(topicName, brokerLists):
        consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                        group_id="test")

        for msg in consumer:
               print(msg)


action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')

if action == 'produce':
    produce(topicName, brokerLists)
elif action == 'consume':
    consume(topicName, brokerLists)
else:
    print('wrong arguments')

Run the KafkaMain.py file to perform produce

Run the KafkaMain.py file. Below is an example transferring the numbers 0-99:

python KafkaMain.py produce [topic] [broker.list]
# Enter the topic you created in CMAK in [topic].
# Enter a PlainText copy of the broker node information you checked earlier in [broker.list].
# Example: python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

Run the KafkaMain.py file to perform consume

Run the KafkaMain.py file.

python KafkaMain.py consume [topic] [broker.list]
# Enter the topic you created in CMAK in [topic].
# Enter a PlainText copy of the broker node information you checked earlier in [broker.list].
# Example: python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

Communication encryption

This section describes how to encrypt the communication between the Apache Kafka client and Apache Kafka's broker nodes.

The whole process is as follows. Steps 1 to 3 are automatically carried out when creating a cluster.

  1. Create your own certificate on the manager node.
  2. After creating a certificate on each broker node, generate a certificate signing request.
  3. The manager node signs the certificate signing request.
  4. After the client downloads the certificate, it creates a TrustStore containing information about the certificate.
  5. Create a configuration file for encrypted communication.
  6. Edit the hosts file.

Download certificate

To download the certificate:

  1. From the NAVER Cloud Platform console's VPC environment, navigate to i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster.
  2. From Cluster details, click [Download] under the Manage certificates item.
    cdss-2-31_ko
  3. When the popup appears, click [OK].
    • The certificate will be downloaded.
  4. Copy the downloaded certificate file to the Producer and Consumer VMs.
    • In the /root path of the Producer and Consumer VMs, save it under the name ca-cert.

Create TrustStore

To create a TrustStore to store certificate information:

  1. To create a TrustStore, enter the following command:
    keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
    
  2. Enter your keystore password.
    cdss-2-36_ko
    • Enter keystore password: Enter password.
    • Re-enter new password: Enter password again.
  3. When the message "Trust this certificate? [no]" appears, enter "yes."
  4. Check that the kafka.client.truststore.jks file has been created by entering an Is command.

Create encryption configuration file

Write the client-auth.properties file as follows.
In [password], enter the password you set when creating the kafka.client.truststore.jks file.

# Create an encryption configuration file in the /root/kafka_2.12-2.4.0 folder.
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]

cdss-2-37_ko

Edit hosts file

The host file is a file that converts host names into IP addresses before the DNS server in Linux. To enable encrypted communication, you need to edit the hosts file (/etc/hosts).

Run the /etc/hosts file.

vi /etc/hosts

Add a copy of the hosts file information of the broker node information.

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao

cdss-2-38_ko

Encrypt communication to transfer data

Run the following command in the Producer VM.
When you enter the message you want to transmit, the message is saved to the broker node. Press [Ctrl] + [C] to quit.

./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# Enter a TLS copy of the broker node information you checked earlier in [broker.list].
# Enter the topic you created in CMAK in [topic].
# Example: ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties

cdss-2-40_ko

Encrypt communication to view stored data

Run the following command in the Consumer VM:

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# Enter a TLS copy of the broker node information you checked earlier in [bootstrap.server].
# In [topic], enter the Topic you entered in the previous step Producer VM.
# Example: ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties

cdss-2-41_ko

Establish data pipeline with Kafka Connect

For information on establishing data pipelines with Kafka Connect, see Establish data pipeline with Kafka Connect.