Linking Cloud Data Streaming Service
    • PDF

    Linking Cloud Data Streaming Service

    • PDF

    Article Summary

    Available in VPC

    This guide introduces how to connect NAVER Cloud Platform's Cloud Hadoop and Cloud Data Streaming Service(CDSS).

    Preparations

    1. Create Cloud Data Streaming Service.
    2. Create and set up a VM to use Cloud Data Streaming Service.
    3. Create Cloud Hadoop cluster.
    Note

    We recommended you create Cloud Hadoop and Cloud Data Streaming Service within the same subnet where they can communicate within the same VPC.

    1. Set up ACG.
      • Port 9092 must be allowed for Cloud Hadoop to access broker node of Cloud Data Streaming Service.
      • Add the Cloud Hadoop subnet band to the Broker node ACG access source of Cloud Data Streaming Service.
        cloudhadoop-use-pre-vpc_en
    Note

    To link CDSS from Zeppelin Notebook, Port 9996 needs to be allowed additionally in the Cloud Hadoop ACG.
    For more details, please refer to the Access web UI guide.

    Data transfer using Kafka

    1. Run Kafka in the Cloud Data Streaming Service VM.
    [root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
    
    1. Create a topic.
      • Add broker-list at the end of bootstrap-server.
    # Create topic
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
    
    # Check created topic
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
    
    Note

    broker-list can be viewed from Cloud Data Streaming Service > Cluster > Broker node information.

    1. Create data.
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
    

    This guide describes two ways to link Kafka using Spark Streaming.

    1. Link CDSS from edge node
    2. Link CDSS from Zeppelin Notebook
    Note

    Please refer to the official website for more information about Spark Streaming.

    Link CDSS from edge node

    1. Run Spark from Cloud Hadoop's edge node.
    $ sudo -u {username} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
    
    1. Use Spark Streaming to read data in real time.
    > import org.apache.spark.sql.streaming.Trigger
    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
    > val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
    

    hadoop-vpc-use-ex13_edge1_vpc_en

    Data can be read with a regular batch.

    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
    > df.show
    
    1. You can also use Spark Streaming to write data in Kafka.

    You must create checkpoints in the edge node before streaming.

    hdfs dfs -mkdir -p  /streaming/checkpointLocation
    

    Read data from the test topic, and save the loaded data in a new topic.

    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
    > val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
    

    If there's no need to process the data in real time, then the content can be saved easily with the code below.

    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
    > df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
    

    Now, check the new topic in Kafka to see the data is in there.

    hadoop-vpc-use-ex13_edge2_vpc_en

    Link CDSS from Zeppelin Notebook

    1. Access Zeppelin Notebook, and then click Interpreter.
      hadoop-vpc-use-ex13_zeppelin1_vpc_en

    2. Add dependencies under spark2.

      • artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
      • exclude : net.jpountz.lz4:lz4:1.3.0
        hadoop-vpc-use-ex13_zeppelin2_vpc_en
    3. Create a new notebook.

      • Set the default interpreter as Spark.
        hadoop-vpc-use-ex13_zeppelin3_vpc_en
    4. You can read and write data in real time using Spark Streaming in Zeppelin Notebook.
      Here's the code:

    > import org.apache.spark.sql.streaming.Trigger
    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
    > val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
    

    hadoop-vpc-use-ex13_zeppelin4_vpc_en

    Note

    You can use UDF to convert data expressed in binary to string.
    Below is an example code.

    > import org.apache.spark.sql.functions.udf
    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
    > val toString = udf((payload: Array[Byte]) => new String(payload))
    > val chstring = df.withColumn("value", toStr(df("value")))
    > df.show
    

    hadoop-vpc-use-ex13_note1_vpc_en


    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.