- 印刷する
- PDF
Cloud Data Streaming Service の連携
- 印刷する
- PDF
VPC環境で利用できます。
このガイドでは、NAVERクラウドプラットフォームCloud HadoopとCloud Data Streaming Service(CDSS)を連携する方法を紹介します。
事前作業
- Cloud Data Streaming Serviceを作成します。
- Cloud Data Streaming Service作成に関する詳細は、Cloud Data Streaming Serviceご利用ガイドを参照してください。
- Cloud Data Streaming Serviceを使用するために、VMの作成とセッティングを行います。
- VMの作成とセッティングに関する詳細は、Cloud Data Streaming Serviceご利用ガイドを参照してください。
- Cloud Hadoopを作成します。
- Cloud Hadoop作成に関する詳細は、Cloud Hadoopを開始するをご参照ください。
Cloud HadoopとCloud Data Streaming Serviceは、同じVPC内で通信を行える同じSubnetで作成することをお勧めします。
- ACGを設定します。
- Cloud HadoopでCloud Data Streaming Service Brokerノードにアクセスするには、9092ポートを許可する必要があります。
- Cloud Data Streaming ServiceのBrokerノードACGアクセスソースにCloud HadoopのSubnetバンドを追加します。
Zeppelin NotebookでCDSS連携を行うには、追加的にCloud Hadoop ACGで9996ポートを許可する必要があります。
詳細内容は、ウェブUIにアクセスガイドをご参照ください。
Kafkaを活用したデータ転送
- Cloud Data Streaming Service VMでKafkaを実行します。
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
- トピックを作成します。
- bootstrap-serverの末尾にbroker-listを追加します。
# トピックの作成
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
# 作成されたトピックの確認
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
broker-listはCloud Data Streaming Service > Cluster > Brokerノード情報で確認できます。
- データを作成します。
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
Kafkaの連携
このガイドでは、Spark StreamingでKafka連携を行う2つの方法を説明します。
- エッジノードでCDSS連携を行う
- Zeppelin NotebookでCDSS連携を行う
Spark Streamingに関する詳細は、公式サイトをご参照ください。
エッジノードでCDSS連携
- Cloud HadoopエッジノードでSparkを実行します。
$ sudo -u {username} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- Spark Streamingを用いてリアルタイムでデータを読み込みます。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
通常のbatchでもデータを読み込むことができます。
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
- Spark Streamingを用いてKafkaにデータを作成することもできます。
ストリーミングを行う前に、エッジノードで先にcheckpointを作成する必要があります。
hdfs dfs -mkdir -p /streaming/checkpointLocation
testトピックからデータを読み込み、読み込んだデータを新しいトピックに保存します。
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
リアルタイムでデータを処理する必要がない場合、以下のコードで内容を簡単に保存できます。
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
ここでKafkaでnewtopicを確認すると、データが入っていることが確認できます。
Zeppelin NotebookでCDSS連携
Zeppelin Notebookにアクセスし、Interpreterをクリックします。
Spark 2の下位にDependenciesを追加します。
- artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
- exclude : net.jpountz.lz4:lz4:1.3.0
[Notebook] > Create new noteをクリックして新しいノートブックを作成してください。
- 基本インタプリタはSpark2に設定します。
- 基本インタプリタはSpark2に設定します。
Zeppelin NotebookでSpark Streamingを使用すると、リアルタイムでデータを読み書きできます。コードは次のとおりです。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
UDFを用いると、binaryで表現されたデータをstringに変換できます。
以下はサンプルコードです。
> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show