Available in VPC
This guide introduces how to link NAVER CLOUD PLATFORM's Cloud Hadoop with Cloud Data Streaming Service (CDSS).
Pre-work
- Create a Cloud Data Streaming Service.
- The Cloud Data Streaming Service User Guide provides more information about creating a cloud data streaming service.
- Create and set up a VM to use the Cloud Data Streaming Service.
- For more information about creating and setting up VMs, refer to the Cloud Data Streaming Service User Guide.
- Create a Cloud Hadoop cluster.
- For more information about creating a Cloud Hadoop cluster, refer to the Get Started with Cloud Hadoop Guide.
It is recommended that Cloud Hadoop and Cloud Data Streaming Service be created on the same Subnet to communicate within the same VPC.
- Set up an ACG.
- In Cloud Hadoop, you must allow port 9092 to access the Cloud Data Streaming Service Broker node.
- Add the Cloud Hadoop Subnet band to the Broker Node ACG access source for the Cloud Data Streaming Service.

For Zeppelin Notebook to link with CDSS, you must additionally allow port 9996 to the Cloud Hadoop ACG.
For more information, see Set UI Access and Password by Services.
Data transfer with Kafka
- Run Kafka in the Cloud Data Streaming Service VM.
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
- Create a topic.
- After bootstrap-server, put broker-list.
# Create a topic
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
# Check the created topics
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
The broker list can be found in Cloud Data Streaming Service > Cluster > Broker Node Information.
- Generate the data.
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
Kafka connection
This guide describes two ways to connect Kafka with Spark Streaming.
- Connecting CDSS on edge nodes
- Connecting CDSS in Zeppelin Notebook
For more information about Spark Streaming, see Spark Streaming Programming Guide.
CDSS connection on edge nodes
- Run Spark on a Cloud Hadoop edge node.
- For more information about edge nodes, refer to the guide to Access a Cluster Node with SSH.
# Cloud Hadoop 1.5 and-higher versions
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {Account name} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8
# Cloud Hadoop version 1.4
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {Account name} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- Read data in real-time using Spark Streaming.
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

Data can also be read in as a regular batch.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
- You can also use Spark Streaming to write data to Kafka.
Before streaming, you must first create a checkpoint on the edge node.
hdfs dfs -mkdir -p /streaming/checkpointLocation
Fetches data from the test topic and stores the read data in a new topic.
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
If you don't need to process the data in real-time, you can simply save it with the code below.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
If you check the newtopic in Kafka, you'll see it has data.

Connecting CDSS in Zeppelin Notebook
-
After accessing the Zeppelin UI, click Interpreter.

-
Add Dependencies under spark2.
- artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- exclude : net.jpountz.lz4:lz4:1.3.0

-
Click [Notebook] > Create new note, and then create a new Notebook.
- Set the Default Interpreter to spark2.

- Set the Default Interpreter to spark2.
-
Zeppelin Notebook can read and write data in real-time using Spark Streaming. The code is as follows:
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

UDF can be used to convert binary-represented data into strings.
Below is an example code.
> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show
