Linking Spark Streaming with Cloud Data Streaming Service

Prev Next

Available in VPC

This guide introduces how to link NAVER CLOUD PLATFORM's Cloud Hadoop with Cloud Data Streaming Service (CDSS).

Pre-work

  1. Create a Cloud Data Streaming Service.
  2. Create and set up a VM to use the Cloud Data Streaming Service.
  3. Create a Cloud Hadoop cluster.
Note

It is recommended that Cloud Hadoop and Cloud Data Streaming Service be created on the same Subnet to communicate within the same VPC.

  1. Set up an ACG.
    • In Cloud Hadoop, you must allow port 9092 to access the Cloud Data Streaming Service Broker node.
    • Add the Cloud Hadoop Subnet band to the Broker Node ACG access source for the Cloud Data Streaming Service.
      cloudhadoop-use-pre-vpc_ko
Note

For Zeppelin Notebook to link with CDSS, you must additionally allow port 9996 to the Cloud Hadoop ACG.
For more information, see Set UI Access and Password by Services.

Data transfer with Kafka

  1. Run Kafka in the Cloud Data Streaming Service VM.
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
  1. Create a topic.
    • After bootstrap-server, put broker-list.
# Create a topic
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]

# Check the created topics
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
Note

The broker list can be found in Cloud Data Streaming Service > Cluster > Broker Node Information.

  1. Generate the data.
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]

Kafka connection

This guide describes two ways to connect Kafka with Spark Streaming.

  1. Connecting CDSS on edge nodes
  2. Connecting CDSS in Zeppelin Notebook
Note

For more information about Spark Streaming, see Spark Streaming Programming Guide.

CDSS connection on edge nodes

  1. Run Spark on a Cloud Hadoop edge node.
# Cloud Hadoop 1.5 and-higher versions
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {Account name} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8

# Cloud Hadoop version 1.4
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {Account name} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
  1. Read data in real-time using Spark Streaming.
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

hadoop-vpc-use-ex13_edge1_vpc_ko

Data can also be read in as a regular batch.

> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
  1. You can also use Spark Streaming to write data to Kafka.

Before streaming, you must first create a checkpoint on the edge node.

hdfs dfs -mkdir -p  /streaming/checkpointLocation

Fetches data from the test topic and stores the read data in a new topic.

> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()

> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()

If you don't need to process the data in real-time, you can simply save it with the code below.

> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()

If you check the newtopic in Kafka, you'll see it has data.

hadoop-vpc-use-ex13_edge2_vpc_ko

Connecting CDSS in Zeppelin Notebook

  1. After accessing the Zeppelin UI, click Interpreter.
    hadoop-vpc-use-ex13_zeppelin1_vpc_ko

  2. Add Dependencies under spark2.

    • artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
    • exclude : net.jpountz.lz4:lz4:1.3.0
      hadoop-vpc-use-ex13_zeppelin2_vpc_ko
  3. Click [Notebook] > Create new note, and then create a new Notebook.

    • Set the Default Interpreter to spark2.
      hadoop-vpc-use-ex13_zeppelin3_vpc_ko
  4. Zeppelin Notebook can read and write data in real-time using Spark Streaming. The code is as follows:

> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

hadoop-vpc-use-ex13_zeppelin4_vpc_ko

Note

UDF can be used to convert binary-represented data into strings.
Below is an example code.

> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show

hadoop-vpc-use-ex13_note1_vpc_ko