VPC環境で利用できます。
このガイドでは、NAVERクラウドプラットフォーム Cloud Hadoopと Cloud Data Streaming Service(CDSS)を連携する方法を説明します。
事前ジョブ
- Cloud Data Streaming Serviceを作成します。
- Cloud Data Streaming Service作成に関する詳細は、Cloud Data Streaming Service ご利用ガイドをご参照ください。
- Cloud Data Streaming Serviceを使用するには、VM作成とセッティングを行う必要があります。
- VM作成とセッティングに関する詳細は、Cloud Data Streaming Service ご利用ガイドをご参照ください。
- Cloud Hadoopクラスタを作成します。
- Cloud Hadoopクラスタ作成に関する詳細は、Cloud Hadoop を開始するガイドをご参照ください。
Cloud Hadoopと Cloud Data Streaming Serviceは、同じ VPC内で通信が可能な同じ Subnetで作成することをお勧めします。
- ACGを設定します。
- Cloud Hadoopで Cloud Data Streaming Service Brokerノードにアクセスするには、9092ポートを許可する必要があります。
- Cloud Data Streaming Serviceの Brokerノード ACG アクセスソースに Cloud Hadoopの Subnet帯域を追加します。

Zeppelin Notebookで CDSS連携するには、追加で Cloud Hadoop ACGに 9996ポートを許可する必要があります。
詳細は、サービス別 UIアクセスとパスワード設定ガイドをご参照ください。
Kafkaを活用したデータ転送
- Cloud Data Streaming Service VMで Kafkaを実行します。
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
- トピックス作成します。
- bootstrap-serverの後には broker-listを入力します。
# トピック作成
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
# 作成されたトピック確認
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
broker-listは Cloud Data Streaming Service > Cluster > Brokerノード情報 で確認できます。
- データを作成します。
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
Kafka連携
このガイドでは、Spark Streamingで Kafka連携する2つの方法を説明します。
- エッジノードで CDSSを連携する
- Zeppelin Notebookで CDSSを連携する
Spark Streamingに関する詳細は、Spark Streaming Programming Guideをご参照ください。
エッジノードで CDSS連携
- Cloud Hadoopエッジノードで Sparkを実行します。
- エッジノードに関する詳細は、SSHでクラスタノードにアクセスガイドをご参照ください。
# Cloud Hadoop 1.5以上のバージョン
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {アカウント名} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8
# Cloud Hadoop 1.4バージョン
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {アカウント名} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- Spark Streamingを使用してリアルタイムでデータを読み取ります。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

一般的な batchでもデータを読み取れます。
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
- Spark Streamingを使用して Kafkaにデータを作成できます。
Streamingの前に、エッジノードで予め checkpointを作成する必要があります。
hdfs dfs -mkdir -p /streaming/checkpointLocation
Testトピックでデータを読み取り、そのデータを新しいトピックに保存します。
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
リアルタイムでデータを処理する必要がない場合、以下のコードで簡単に内容を保存できます。
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
これで Kafkaで newtopicを確認すると、データが入っていることを確認できます。

Zeppelin Notebookで CDSS連携
-
Zeppelin UIにアクセスした後、 Interpreter をクリックします。

-
spark2の下位に Dependenciesを追加します。
- artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- exclude : net.jpountz.lz4:lz4:1.3.0

-
[Notebook] > Create new note をクリックした後、新しいノートブックを作成します。
- Default Interpreterは spark2に設定します。

- Zeppelin Notebookで Spark Streamingを使用し、リアルタイムでデータの読み取りと書き込みができます。コードは次の通りです。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

UDFを利用して Binaryで表現されたデータを stringも変換できます。
以下の、サンプルコードです。
> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show
