Cloud Data Streaming Service utilization
    • PDF

    Cloud Data Streaming Service utilization

    • PDF

    Article Summary

    Available in VPC

    This guide describes how to access CMAK to create and save data to topics, how to send data save in these topics to Cloud Data Streaming Service clusters, and how to encrypt the communication section between Apache Kafka client and Apache Kafka's broker nodes.

    Access CMAK

    CMAK is a platform that manages Apache Kafka clusters. You can access CMAK to create topics, change the number of partitions in topics, or change the data retention period of topics.

    Note

    Before accessing CMAK, you need to activate the public domain.

    The following describes how to access CMAK.

    1. From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
    2. Mark the checkbox of the cluster, and then click the Manage cluster > Change CMAK access domain settings button.
    3. When a pop-up window appears, check the information and click the [OK] button.
      • The public domain is activated.
    4. Mark the checkbox of the cluster, and then click Manage cluster > Access CMAK.
      • If the Access CMAK menu is not activated, then it means that the public domain is not activated. Proceed with the Steps 1 to 3 again.
    5. When the preparation window for accessing CMAK appears, click the Go to CMAK button.
      cdss-cmak_cmak01_en.png
    6. When the login pop-up window appears, enter the user ID and password configured when creating the cluster.
      • If you've forgotten the ID, then you can check the ID by clicking Manage cluster > Reset CMAK access password.
    7. Click the cluster account name.
      cdss-cmak_cmak02_en.png
    8. Check the information about topics and broker nodes from Cluster summary.
      cdss-cmak_cmak03_en
    • Information about topics
      cmak-04_en.png

    • Information about broker nodes
      cmak-05_en.png

    Create topic

    1. Access the CMAK page.
    2. From the CMAK page, click the Topic > Create menu.
      cdss-cmak_topicadd01_en.png
    3. Enter the topic information.
      cdss-cmak_topicadd02_en.png
      • Topic: Enter the name of the topic.
      • Partitions: Enter the number of partitions for the topic.
      • Replication factor: Enter the number of replication factors for the topic.
    4. Click the [Create] button.
    Note

    For descriptions on detailed configurations, refer to Apache Kafka Documentation.

    Caution
    • After creating the topic, it's only possible to increase the number of partitions. It's not possible to decrease the number of partitions that are already added.
    • If the order guarantee is necessary when consuming incoming data, then the number of partitions must be set as 1. However, if the number of partitions is 1, then you need to be careful about the broker node's disk usage since all data is stored in one broker node.
    • The number of replications can't be changed after creating the topic. It is recommended that you apply at least 2 or more replications for stable service operation.

    View topic information

    cdss-cmak_topicinfo.png

    ① Topic summary: Check the information of the topic.
    ② Operations: Delete topics, add partitions, redistribute assignments, and change the settings.
    ③ Partitions by broker: Check the information of partitions created for each broker node.
    ④ Partition information: Check which broker nodes the leader and replications of each partition are located on.

    Change topic's data retention period

    The following describes how to adjust the data retention period of a topic.

    1. Access CMAK by referring to Access CMAK.
    2. Click the cluster name.
    3. Click Topic > List.
      cdss-cmak_topiclist_en.png
    4. From the list of topics, click the name of the topic you want to change the data retention period for.
    5. Click the Operations > [Update config] button.
      cdss-cmak_topicconfig01_en.png
    6. Modify the retention.ms value.
      cdss-cmak_topicconfig02_en.png
      • If you wish to set it as 1 hour, then enter the value of 3600000.
      • If you wish to set it as n hours, then enter the resulting value of 3600000 * n.
    7. Click the [Update config] button.

    Change topic's number of partitions

    You can increase the number of partitions for topics created through CMAK. Partitions can be added only, and partitions already assigned can't be deleted.

    The following describes how to increase the number of partitions for a topic.

    1. Access CMAK by referring to Access CMAK.
    2. Click the cluster name.
    3. Click Topic > List.
      cdss-cmak_topiclist_en.png
    4. From the list of topics, click the name of the topic you want to change the data retention period for.
    5. Click the Operations > [Add partitions] button.
      cdss-cmak_parition01_en
    6. Enter the number of partition in the Partitions field.
      • You can only enter a number greater than the entered number.
        cdss-cmak_parition02_en
    7. Click the [Add partitions] button.

    Implement Producer/Consumer

    This guide describes how to create Producer VM and Consumer VM with the following structure to save and send data to the topic.

    It describes how to send and save data using Apache Kafka, Java, and Python.

    cdss-2-23.png

    Note

    This guide is based on the CentOS 7.3 version.

    Create Producer VM and Consumer VM

    The following describes how to create Producer VM and Consumer VM.

    1. Refer to Getting started with Server and create a server.
    2. When creating the server, select the same VPC as the VPC where you created the Cloud Data Streaming Service cluster on.
      cdss-cmak_server_en
    3. For Subnet, select a public subnet.
      • A public IP must assigned to the VM to access it.

    Install Java

    Enter the following command to install Java.

    yum install java -y
    

    Install Apache Kafka

    Enter the following command to install Apache Kafka.

    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    # Decompress it.
    tar -zxvf kafka_2.12-2.4.0.tgz
    

    View Broker node information

    The following describes how to check the broker node information.

    1. From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
    2. From Cluster details, click the [View details] button of Broker node information.
      cdss-cmak_broker01_en
    3. When the Broker node information window appears, check the information.
      cdss-cmak_broker02_en
      • PlainText: the information for communicating with the broker node without encryption
      • TLS: the information for communicating with the broker node with encryption
      • hosts file information: the information required when modifying the hosts file, which is used for encrypted communication

    Set ACG of broker node

    The following describes how to set ACG rules for the broker node.

    1. From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
    2. From Cluster details, click cdss-cluster-popupicon of Broker node ACG.
    3. Select the ACG of the broker node from the ACG list, and then click the [Set] button.
    4. Enter the ACG rule, and then click the [Add] button.
      cdss-cmak_acg01_en.png
      • Protocol: TCP
      • Access source: Enter the private IP of Producer VM and Consumer VM.
      • Allowed port: Enter 9092 - 9093.
      • Enter notes for the ACG.
    5. After checking if the rule has been added, click the [Apply] button.
      • The rule is applied to the ACG.

    Send data using Apache Kafka

    1. Access the Producer VM created in the previous step, and then enter the following command.

      cd kafka_2.12-2.4.0
      ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
      # Enter the PlainText copy of the broker node information checked earlier in [broker.list].
      # In [topic], enter the topic created in CMAK.
      # Example: ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092 ,192.168.2.25:9092, 192.168.2.26:9092 --topic test
      

      cdss-2-29_en.png

    2. Enter the message you want to send.

      • The message is saved to the broker node.
      • If you want to exit, then press the [Ctrl + C] key.
        cdss-2-30_en.png
    3. Access the Consumer VM created in the previous step, and then enter the following command.

      • Using the --from-beginning command will search all the data on the topic from the beginning.
      • If the --from-beginning command isn't used, then only the data entered from the moment of the data search is searched.
      cd kafka_2.12-2.4.0
      ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
      # Enter the PlainText copy of the broker node information checked earlier in [bootstrap.server].
      # In [topic], enter the topic entered in Producer VM in the previous step.
      # Example: ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092 --topic test --from-beginning
      

    Send data using Java

    It describes how to send data using Java.

    Note

    This guide is based on using IntelliJ IDEA.

    Create project

    The following describes how to create a project.

    1. Run IntelliJ IDEA, and then click File > New > Project.
    2. Select Maven Archetype, enter the project information, and then click the [Create] button to create the project.
      java-1_en.png

    Modify pom.xml file

    Modify the pom.xml file, which defines the project dependency, Java version, and the packaging method.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <groupId>org.example</groupId>
        <artifactId>maventest</artifactId>
        <version>1.0-SNAPSHOT</version>
        <url>http://maven.apache.org</url>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <!-- Apache Kafka version in Cloud Data Streaming Service -->
                <version>2.4.0</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.21</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.3</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>KafkaMain</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
    
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    Note

    The pom.xml file may differ, depending on the user environment.

    Create KafkaMain.java

    When running the Java application, it delivers the produce/consume status, topic list, and broker list as arguments.

    public class KafkaMain {
        public static void main(String[] args) throws IOException {
            String topicName = args[1];
            String brokers = args[2];
    
            switch(args[0]){
                case "produce":
                    Producer.produce(brokers, topicName);
                    break;
                case "consume":
                    Consumer.consume(brokers, topicName);
                    break;
                default:
                    System.out.println("Wrong arguments");
                    break;
            }
            System.exit(0);
        }
    }
    

    Create Producer.java

    Create the Producer.java file. The following describes an example of sending numbers between 0 and 99.

    public class Producer {
        public static void produce(String brokers, String topicName) throws IOException {
            // Create Producer
            KafkaProducer<String, String> producer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<>(properties);
    
            for(int i=0;i<100;i++){
                ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
                producer.send(record);
            }
            producer.close();
        }
    }
    

    Create Consumer.java

    Create the Consumer.java file.

    public class Consumer {
        public static int consume(String brokers, String topicName) {
            // Create Consumer
            KafkaConsumer<String, String> consumer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "consumer_group");
            properties.setProperty("auto.offset.reset", "earliest");
    
            consumer = new KafkaConsumer<>(properties);
    
            consumer.subscribe(Arrays.asList(topicName));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                    System.out.println(record.value());
                }
            }
        }
    }
    

    Build and run jar file

    1. Save the created Java code to git, and then enter the git clone command from the VM to download the code.
      git clone your git repository
      
    2. Install Maven to build the Java application.
      yum install maven -y
      
    3. Go to the folder where the downloaded Java code is located, and then build the jar file.
      Once you build the jar file, the jar file is created in the target folder.
      cd kafkatest
      mvn clean package
      
    4. Go to the target folder, and then run the jar file.
      cd target
      
      # Send data
      java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
      # In [topic], enter the topic created in CMAK.
      # Enter the PlainText copy of the broker node information checked earlier in [broker.list].
      Example: java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
      
      
      # Search data
      java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
      # In [topic], enter the topic created in CMAK.
      # Enter the PlainText copy of the broker node information checked earlier in [broker.list].
      Example: java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
      

    Send data using Python

    This guide is based on the Python 2.7.5 version.

    To use Kafka in Python. install kafka-python package.

    # Install pip
    curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
    python get-pip.py
    
    # Install kafka-python package
    pip install kafka-python
    

    Create KafkaMain.py

    Create the KafkaMain.py file.

    import sys
    from kafka import KafkaProducer, KafkaConsumer
    from json import dumps
    import time
    
    
    def produce(topicName, brokerLists):
            producer = KafkaProducer(bootstrap_servers=brokerLists,
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
            for i in range(100):
                    producer.send(topicName, i)
    
    def consume(topicName, brokerLists):
            consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                            group_id="test")
    
            for msg in consumer:
                   print(msg)
    
    
    action=sys.argv[1]
    topicName=sys.argv[2]
    brokerLists=sys.argv[3].split(',')
    
    if action == 'produce':
        produce(topicName, brokerLists)
    elif action == 'consume':
        consume(topicName, brokerLists)
    else:
        print('wrong arguments')
    

    Run KafkaMain.py to perform produce

    Run the KafkaMain.py file. The following is an example of sending numbers between 0 and 99.

    python KafkaMain.py produce [topic] [broker.list]
    # In [topic], enter the topic created in CMAK.
    # Enter the PlainText copy of the broker node information checked earlier in [broker.list].
    # Example: python KafkaMain.py produce test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
    

    Run KafkaMain.py file to perform consume

    Run the KafkaMain.py file.

    python KafkaMain.py consume [topic] [broker.list]
    # In [topic], enter the topic created in CMAK.
    # Enter the PlainText copy of the broker node information checked earlier in [broker.list].
    # Example: python KafkaMain.py consume test 192.168.2.24:9092, 192.168.2.25:9092, 192.168.2.26:9092
    

    Encrypt communication section

    It describes how to encrypt the communication section between the Apache Kafka client and Apache Kafka's broker nodes.

    The overall process is as follows. Steps 1 to 3 are automatically executed when creating a cluster.

    1. Create a self-signed certificate from the manager node.
    2. Create a certificate from each broker node, and then create a certificate signature request.
    3. The manager node signs the certificate signature request.
    4. In the client, download the certificate, and then create TrustStore which has the certificate information.
    5. Create a configuration file for the encrypted communication.
    6. Modify the hosts file.

    Download certificate

    The following describes how to download the certificate.

    1. From the NAVER Cloud Platform console, click the Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster menus, in that order.
    2. From Cluster details, click the [Download] button of the Manage certificate field.
      cdss-2-31_en.png
    3. When a pop-up window appears, click the [OK] button.
      • The certificate is downloaded.
    4. Copy the downloaded certificate file to producer and Consumer VMs.
      • Save the file as ca-cert in the /root path of the Producer and Consumer VMs.

    Create Truststore

    The following describes how to create TrustStore that stores the certificate information.

    1. Enter the following command to create Truststore.
      keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
      
    2. Enter the keystore password.
      cdss-2-36_en.png
      • Enter keystore password: Enter the password.
      • Re-enter new password: Enter the password again.
    3. Trust this certificate? [no]: When this text appears, enter "yes."
    4. Enter the Is command to check if the kafka.client.truststore.jks file has been created.

    Create encryption configuration file

    Create a client-auth.properties file as follows.
    In [password], enter the password set when creating the kafka.client.truststore.jks file.

    # Create the encryption configuration file in the /root/kafka_2.12-2.4.0 folder.
    cd kafka_2.12-2.4.0
    vi client-auth.properties
    
    security.protocol=SSL
    ssl.truststore.location=/root/kafka.client.truststore.jks
    ssl.truststore.password=[password]
    

    cdss-2-37_en.png

    Modify hosts file

    The host file converts the host name to IP before the DNS server in Linux. The hosts file (/etc/hosts) needs to be modified for encrypted communication.

    Run the /etc/hosts file.

    vi /etc/hosts
    

    Add the copy of the hosts file information from Broker node information.

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.2.24 yeakafka2-d-2am
    192.168.2.25 yeakafka2-d-2an
    192.168.2.26 yeakafka2-d-2ao
    

    cdss-2-38_en.png

    Send data by encrypting communication section

    Run the following command from Producer VM.
    When you enter the message you want to send, the message is stored in the broker node. If you want to exit, then press the [Ctrl] + [C] key.

    ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
    # Enter the TLS copy of the broker node information checked earlier in [broker.list].
    # In [topic], enter the topic created in CMAK.
    # Example: ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093, yeakafka2-d-2an:9093, yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
    

    cdss-2-40_en.png

    Search saved data by encrypting communication section

    Run the following command from Consumer VM.

    cd kafka_2.12-2.4.0
    ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
    # Enter the TLS copy of the broker node information checked earlier in [bootstrap.server].
    # In [topic], enter the topic entered in Producer VM in the previous step.
    # Example: ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093, yeakafka2-d-2an:9093, yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
    

    cdss-2-41_en.png

    Build data pipeline with Kafka Connect

    For information on how to build data pipelines with Kafka Connect, refer to Build data pipeline with Kafka Connect.


    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.