Cloud Data Streaming Serviceで Spark Streaming連携

Prev Next

VPC環境で利用できます。

このガイドでは、NAVERクラウドプラットフォーム Cloud Hadoopと Cloud Data Streaming Service(CDSS)を連携する方法を説明します。

事前ジョブ

  1. Cloud Data Streaming Serviceを作成します。
  2. Cloud Data Streaming Serviceを使用するには、VM作成とセッティングを行う必要があります。
  3. Cloud Hadoopクラスタを作成します。
参考

Cloud Hadoopと Cloud Data Streaming Serviceは、同じ VPC内で通信が可能な同じ Subnetで作成することをお勧めします。

  1. ACGを設定します。
    • Cloud Hadoopで Cloud Data Streaming Service Brokerノードにアクセスするには、9092ポートを許可する必要があります。
    • Cloud Data Streaming Serviceの Brokerノード ACG アクセスソースに Cloud Hadoopの Subnet帯域を追加します。
      cloudhadoop-use-pre-vpc_ko
参考

Zeppelin Notebookで CDSS連携するには、追加で Cloud Hadoop ACGに 9996ポートを許可する必要があります。
詳細は、サービス別 UIアクセスとパスワード設定ガイドをご参照ください。

Kafkaを活用したデータ転送

  1. Cloud Data Streaming Service VMで Kafkaを実行します。
[root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
  1. トピックス作成します。
    • bootstrap-serverの後には broker-listを入力します。
# トピック作成
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]

# 作成されたトピック確認
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
参考

broker-listは Cloud Data Streaming Service > Cluster > Brokerノード情報 で確認できます。

  1. データを作成します。
[root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]

Kafka連携

このガイドでは、Spark Streamingで Kafka連携する2つの方法を説明します。

  1. エッジノードで CDSSを連携する
  2. Zeppelin Notebookで CDSSを連携する
参考

Spark Streamingに関する詳細は、Spark Streaming Programming Guideをご参照ください。

エッジノードで CDSS連携

  1. Cloud Hadoopエッジノードで Sparkを実行します。
# Cloud Hadoop 1.5以上のバージョン
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {アカウント名} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8

# Cloud Hadoop 1.4バージョン
[sshuser@e-001-example-pzt-hd ~]$ sudo -u {アカウント名} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
  1. Spark Streamingを使用してリアルタイムでデータを読み取ります。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

hadoop-vpc-use-ex13_edge1_vpc_ko

一般的な batchでもデータを読み取れます。

> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
> df.show
  1. Spark Streamingを使用して Kafkaにデータを作成できます。

Streamingの前に、エッジノードで予め checkpointを作成する必要があります。

hdfs dfs -mkdir -p  /streaming/checkpointLocation

Testトピックでデータを読み取り、そのデータを新しいトピックに保存します。

> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()

> val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()

リアルタイムでデータを処理する必要がない場合、以下のコードで簡単に内容を保存できます。

> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
> df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()

これで Kafkaで newtopicを確認すると、データが入っていることを確認できます。

hadoop-vpc-use-ex13_edge2_vpc_ko

Zeppelin Notebookで CDSS連携

  1. Zeppelin UIにアクセスした後、 Interpreter をクリックします。
    hadoop-vpc-use-ex13_zeppelin1_vpc_ko

  2. spark2の下位に Dependenciesを追加します。

    • artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
    • exclude : net.jpountz.lz4:lz4:1.3.0
      hadoop-vpc-use-ex13_zeppelin2_vpc_ko
  3. [Notebook] > Create new note をクリックした後、新しいノートブックを作成します。

  • Default Interpreterは spark2に設定します。
    hadoop-vpc-use-ex13_zeppelin3_vpc_ko
  1. Zeppelin Notebookで Spark Streamingを使用し、リアルタイムでデータの読み取りと書き込みができます。コードは次の通りです。
> import org.apache.spark.sql.streaming.Trigger
> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
> val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()

hadoop-vpc-use-ex13_zeppelin4_vpc_ko

参考

UDFを利用して Binaryで表現されたデータを stringも変換できます。
以下の、サンプルコードです。

> import org.apache.spark.sql.functions.udf
> val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
> val toString = udf((payload: Array[Byte]) => new String(payload))
> val chstring = df.withColumn("value", toStr(df("value")))
> df.show

hadoop-vpc-use-ex13_note1_vpc_ko