- Print
- PDF
Linking Cloud Data Streaming Service
- Print
- PDF
Available in VPC
This guide introduces how to connect NAVER Cloud Platform's Cloud Hadoop and Cloud Data Streaming Service(CDSS).
Preparations
- Create Cloud Data Streaming Service.
- Please refer to the Cloud Data Streaming Service Guide for more information about creating Cloud Data Streaming Service.
- Create and set up a VM to use Cloud Data Streaming Service.
- Please refer to the Cloud Data Streaming Service Guide for more information about creating and setting up VMs.
- Create Cloud Hadoop cluster.
- Please refer to Getting started with Cloud Hadoop for more information about creating Cloud Hadoop cluster.
We recommended you create Cloud Hadoop and Cloud Data Streaming Service within the same subnet where they can communicate within the same VPC.
- Set up ACG.
- Port 9092 must be allowed for Cloud Hadoop to access broker node of Cloud Data Streaming Service.
- Add the Cloud Hadoop subnet band to the Broker node ACG access source of Cloud Data Streaming Service.
To link CDSS from Zeppelin Notebook, Port 9996 needs to be allowed additionally in the Cloud Hadoop ACG.
For more details, please refer to the Access web UI guide.
Data transfer using Kafka
- Run Kafka in the Cloud Data Streaming Service VM.
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
- Create a topic.
- Add broker-list at the end of bootstrap-server.
# Create topic
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
# Check created topic
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
broker-list can be viewed from Cloud Data Streaming Service > Cluster > Broker node information.
- Create data.
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
Link Kafka
This guide describes two ways to link Kafka using Spark Streaming.
- Link CDSS from edge node
- Link CDSS from Zeppelin Notebook
Please refer to the official website for more information about Spark Streaming.
Link CDSS from edge node
- Run Spark from Cloud Hadoop's edge node.
$ sudo -u {username} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- Use Spark Streaming to read data in real time.
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
Data can be read with a regular batch.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
- You can also use Spark Streaming to write data in Kafka.
You must create checkpoints in the edge node before streaming.
hdfs dfs -mkdir -p /streaming/checkpointLocation
Read data from the test topic, and save the loaded data in a new topic.
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
If there's no need to process the data in real time, then the content can be saved easily with the code below.
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
Now, check the new topic in Kafka to see the data is in there.
Link CDSS from Zeppelin Notebook
Access Zeppelin Notebook, and then click Interpreter.
Add dependencies under spark2.
- artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- exclude : net.jpountz.lz4:lz4:1.3.0
Create a new notebook.
- Set the default interpreter as Spark.
- Set the default interpreter as Spark.
You can read and write data in real time using Spark Streaming in Zeppelin Notebook.
Here's the code:
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
You can use UDF to convert data expressed in binary to string.
Below is an example code.
> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show