Cloud Data Streaming Service の連携
    • PDF

    Cloud Data Streaming Service の連携

    • PDF

    Article Summary

    VPC環境で利用できます。

    このガイドでは、NAVERクラウドプラットフォームCloud HadoopとCloud Data Streaming Service(CDSS)を連携する方法を紹介します。

    事前作業

    1. Cloud Data Streaming Serviceを作成します。
    2. Cloud Data Streaming Serviceを使用するために、VMの作成とセッティングを行います。
    3. Cloud Hadoopを作成します。
    参考

    Cloud HadoopとCloud Data Streaming Serviceは、同じVPC内で通信を行える同じSubnetで作成することをお勧めします。

    1. ACGを設定します。
      • Cloud HadoopでCloud Data Streaming Service Brokerノードにアクセスするには、9092ポートを許可する必要があります。
      • Cloud Data Streaming ServiceのBrokerノードACGアクセスソースにCloud HadoopのSubnetバンドを追加します。
        cloudhadoop-use-pre-vpc_ja
    参考

    Zeppelin NotebookでCDSS連携を行うには、追加的にCloud Hadoop ACGで9996ポートを許可する必要があります。
    詳細内容は、ウェブUIにアクセスガイドをご参照ください。

    Kafkaを活用したデータ転送

    1. Cloud Data Streaming Service VMでKafkaを実行します。
    [root@s17e27e0cf6c ~ ]# cd kafka_2.12-2.4.0
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
    
    1. トピックを作成します。
      • bootstrap-serverの末尾にbroker-listを追加します。
    # トピックの作成
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --create --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --replication-factor 1 --partitions 1 --topic [topic]
    
    # 作成されたトピックの確認
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --bootstrap-server 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092
    
    参考

    broker-listはCloud Data Streaming Service > Cluster > Brokerノード情報で確認できます。

    1. データを作成します。
    [root@s17e27e0cf6c kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list 172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092 --topic [topic]
    

    Kafkaの連携

    このガイドでは、Spark StreamingでKafka連携を行う2つの方法を説明します。

    1. エッジノードでCDSS連携を行う
    2. Zeppelin NotebookでCDSS連携を行う
    参考

    Spark Streamingに関する詳細は、公式サイトをご参照ください。

    エッジノードでCDSS連携

    1. Cloud HadoopエッジノードでSparkを実行します。
    $ sudo -u {username} spark-shell --master yarn --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
    
    1. Spark Streamingを用いてリアルタイムでデータを読み込みます。
    > import org.apache.spark.sql.streaming.Trigger
    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
    > val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
    

    hadoop-vpc-use-ex13_edge1_vpc_ja

    通常のbatchでもデータを読み込むことができます。

    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "[topic]").option("startingOffsets","earliest").load()
    > df.show
    
    1. Spark Streamingを用いてKafkaにデータを作成することもできます。

    ストリーミングを行う前に、エッジノードで先にcheckpointを作成する必要があります。

    hdfs dfs -mkdir -p  /streaming/checkpointLocation
    

    testトピックからデータを読み込み、読み込んだデータを新しいトピックに保存します。

    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "test").option("startingOffsets", "earliest").load()
    
    > val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("checkpointLocation","/streaming/checkpointLocation").option("topic", "newtopic").start()
    

    リアルタイムでデータを処理する必要がない場合、以下のコードで内容を簡単に保存できます。

    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").option("startingOffsets","earliest").load()
    
    > df.selectExpr("key","value").write.format("kafka").option("kafka.bootstrap.servers","172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("topic","newtopic").save()
    

    ここでKafkaでnewtopicを確認すると、データが入っていることが確認できます。

    hadoop-vpc-use-ex13_edge2_vpc_ja

    Zeppelin NotebookでCDSS連携

    1. Zeppelin Notebookにアクセスし、Interpreterをクリックします。
      hadoop-vpc-use-ex13_zeppelin1_vpc_ja

    2. Spark 2の下位にDependenciesを追加します。

      • artifact : org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
      • exclude : net.jpountz.lz4:lz4:1.3.0
        hadoop-vpc-use-ex13_zeppelin2_vpc_ja
    3. [Notebook] > Create new noteをクリックして新しいノートブックを作成してください。

      • 基本インタプリタはSpark2に設定します。
        hadoop-vpc-use-ex13_zeppelin3_vpc_ja
    4. Zeppelin NotebookでSpark Streamingを使用すると、リアルタイムでデータを読み書きできます。コードは次のとおりです。

    > import org.apache.spark.sql.streaming.Trigger
    > val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.16.2.6:9092,172.16.2.7:9092,172.16.2.8:9092").option("subscribe", "[topic]").option("startingOffsets", "earliest").load()
    > val stream = df.writeStream.trigger(Trigger.ProcessingTime("5 seconds")).outputMode("append").format("console").start().awaitTermination()
    

    hadoop-vpc-use-ex13_zeppelin4_vpc_ja

    参考

    UDFを用いると、binaryで表現されたデータをstringに変換できます。
    以下はサンプルコードです。

    > import org.apache.spark.sql.functions.udf
    > val df = spark.read.format("kafka").option("kafka.bootstrap.servers", "172.16.0.6:9092,172.16.0.7:9092,172.16.0.8:9092").option("subscribe", "test").load()
    > val toString = udf((payload: Array[Byte]) => new String(payload))
    > val chstring = df.withColumn("value", toStr(df("value")))
    > df.show
    

    hadoop-vpc-use-ex13_note1_vpc_ja


    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.