Cloud Data Streaming Serviceの活用
    • PDF

    Cloud Data Streaming Serviceの活用

    • PDF

    Article Summary

    VPC環境で利用できます。

    CMAKにアクセスしてTopicを作成し、Topicにデータを保存する方法、Topicに保存されたデータをCloud Data Streaming Service Clusterに転送する方法、Apache KafkaクライアントとApache KafkaのBrokerノード間の通信区間を暗号化する方法を説明します。

    CMAKにアクセス

    CMAKはApache Kafkaクラスタを管理するプラットフォームです。 CMAKにアクセスしてTopicを作成し、TopicのPartition数の変更やTopicのデータ保管周期を変更できます。

    参考

    CMAKにアクセスする前に、Publicドメインを有効化する必要があります。

    CMAKにアクセスする方法は以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
    2. クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスドメインの設定変更ボタンをクリックします。
    3. ポップアップが表示されたら情報を確認し、[確認] ボタンをクリックします。
      • Publicドメインが有効になります。
    4. クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKにアクセスをクリックします。
      • CMAKアクセスメニューが有効になっていない場合、Publicドメインは無効の状態です。 1~3の手続きをもう一度実行してください。
    5. CMAKにアクセスするための事前ジョブ画面が表示されたら、CMAKに移動するボタンをクリックします。
      cdss-cmak_cmak01_ja.png
    6. ログインポップアップが表示されたら、クラスタを作成する際に設定したユーザーIDとパスワードを入力します。
      • IDを忘れた場合、クラスタ管理 > CMAKアクセスパスワードの初期化をクリックすると、IDを確認できます。
    7. クラスタアカウント名をクリックします。
      cdss-cmak_cmak02_ja.png
    8. Cluster SummaryでTopicとBrokerノードに関する情報を確認します。
      cdss-cmak_cmak03_ja
    • Topicに関する情報
      cmak-04_ja.png

    • Brokerノードに関する情報
      cmak-05_ja.png

    Topicの作成

    1. CMAKページにアクセスします。
    2. CMAKページでTopic > Createメニューをクリックします。
      cdss-cmak_topicadd01_ja.png
    3. トピック情報を入力します。
      cdss-cmak_topicadd02_ja.png
      • Topic:Topicの名前を入力
      • Partitions:TopicのPartitions数を入力
      • Replication Factor:TopicのReplication数を入力
    4. [Create] ボタンをクリックします。
    参考

    詳細設定値についての説明は、Apache Kafka Documentationをご参照ください。

    注意
    • Topicの作成後は、Partitions数を増やすことのみ可能であり、既に追加済みのPartitions数を減らすことはできません。
    • データを消費する際に順序を保障する必要がある場合、Partitions数は1に設定する必要があります。 ただし、Partitionsが1つの場合、1つのブローカーノードにすべてのデータが保存されるため、ブローカーノードのディスク使用量管理に注意してください。
    • Topicの作成後、Replication数は変更できません。 安定したサービス運用のために、最低2つ以上のReplicationを適用することをお勧めします。

    Topic情報の確認

    cdss-cmak_topicinfo.png

    ① Topic Summary:Topicの情報を確認
    ② Operations:Topic削除、Partition追加、再分配、設定変更
    ③ Partitions by Broker:各ブローカーノードに関して作成されたPartition情報を確認
    ④ Partition Information:各PartitionのLeaderとReplicationがどんなブローカーノードに位置しているのかを確認

    Topicのデータ保管周期の変更

    Topicのデータ保管周期を調整する方法は以下のとおりです。

    1. CMAKにアクセスを参考にしてCMAKにアクセスします。
    2. クラスタ名をクリックします。
    3. Topic > Listをクリックします。
      cdss-cmak_topiclist_ja.png
    4. Topicリストでデータの保管周期を変更するTopicの名前をクリックします。
    5. Operations > [Update Config] ボタンをクリックします。
      cdss-cmak_topicconfig01_ja.png
    6. retention.ms値を修正します。
      cdss-cmak_topicconfig02_ja.png
      • 1時間に設定する場合、3600000値を入力します。
      • n時間に設定する場合、3600000*nの結果値を入力します。
    7. [Update Config] ボタンをクリックします。

    TopicのPartition数の変更

    CMAKで作成されたTopicのPartition数は増やすことができます。 Partitionは追加のみ可能で、過去に割り当てられたPartitionを削除することはできません。

    TopicのPartition数を増やす方法は以下のとおりです。

    1. CMAKにアクセスを参考にしてCMAKにアクセスします。
    2. クラスタ名をクリックします。
    3. Topic > Listをクリックします。
      cdss-cmak_topiclist_ja.png
    4. Topicリストでデータの保管周期を変更するTopicの名前をクリックします。
    5. Operations > [Add Partitions] ボタンをクリックします。
      cdss-cmak_parition01_ja
    6. Partitions項目にPartition数を入力します。
      • 入力された数より大きい数のみ入力できます。
        cdss-cmak_parition02_ja
    7. [Add Paritions] ボタンをクリックします。

    Producer/Consumerの実装

    以下のような構造でProducer VM、Consumer VMを作成し、Topicにデータを保存して転送する方法を説明します。

    Apache Kafka、Java、Pythonを活用してデータを転送し、保存する方法を説明します。

    cdss-2-23.png

    参考

    このガイドでは、CentOS 7.3を基準に説明します。

    Producer VM、Consumer VMの作成

    Producer VMとConsumer VMを作成する方法は、以下のとおりです。

    1. Serverを開始するを参照してサーバを作成します。
    2. サーバ作成時にCloud Data Streaming Serviceクラスタを作成したVPCと同じVPCを選択します。
      cdss-cmak_server_ja
    3. Subnetは、Public Subnetを選択します。
      • このVMにグローバルIPを割り当てるとアクセスできます。

    Javaのインストール

    以下のコマンドを入力してJavaをインストールします。

    yum install java -y
    

    Apache Kafkaのインストール

    以下のコマンドを入力してApache Kafkaをインストールします。

    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    # 圧縮を展開します。
    tar -zxvf kafka_2.12-2.4.0.tgz
    

    Brokerノード情報の確認

    Brokerノード情報を確認する方法は以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
    2. クラスタの詳細情報でBrokerノード情報の [詳細を見る] ボタンをクリックします。
      cdss-cmak_broker01_ja
    3. Brokerノード情報画面が表示されたら、情報を確認します。
      cdss-cmak_broker02_ja
      • PlainText:Brokerノードと暗号化なしに通信するための情報
      • TLS:Brokerノードと暗号化通信するための情報
      • hostsファイル情報:暗号化通信に利用されるhostsファイルの修正時に必要な情報

    BrokerノードのACG設定

    BrokerノードのACGルールを設定する方法は以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
    2. クラスタの詳細情報でBrokerノードACGのcdss-cluster-popupiconをクリックします。
    3. ACGリストでBrokerノードのACGを選択し、[設定] ボタンをクリックします。
    4. ACGルールを入力し、[追加] ボタンをクリックします。
      cdss-cmak_acg01_ja.png
      • プロトコル:TCP
      • アクセスソース:Producer VM、Consumer VMのプライベートIPを入力
      • 許可ポート:9092-9093を入力
      • ACGに関するメモを入力
    5. ルールが追加されているか確認し、[適用] ボタンをクリックします。
      • 当該ルールがACGに適用されます。

    Apache Kafkaを活用してデータを転送

    1. 前の段階で作成したProducer VMにアクセスし、以下のコマンドを入力します。

      cd kafka_2.12-2.4.0
      ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
      # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。
      # [topic]にCMAKで作成したTopicを入力します。
      # 例) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092 --topic test
      

      cdss-2-29_ja.png

    2. 転送するメッセージを入力します。

      • Brokerノードに当該メッセージが保存されます。
      • 終了する場合は、[Ctrl + C]キーを押します。
        cdss-2-30_ja.png
    3. 前の段階で作成したConsumer VMにアクセスし、以下のコマンドを実行します。

      • --from-beginningコマンドを使用する場合、このTopicに関するデータを最初からすべて照会します。
      • --from-beginningコマンドを使用しない場合、データを照会した瞬間から入力されるデータのみ照会します。
      cd kafka_2.12-2.4.0
      ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
      # [bootstrap.server]に先に確認したブローカーノード情報のPlainTextコピーを入力します。
      # [topic]に前段階のProducer VMで入力したTopicを入力します。
      # 例) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092 --topic test --from-beginning
      

    Javaを活用してデータを転送

    Javaを活用してデータを転送する方法を説明します。

    参考

    このガイドでは、IntelliJ IDEAを使用することを基準に説明します。

    プロジェクトの作成

    プロジェクトを作成する方法は以下のとおりです。

    1. IntelliJ IDEAを実行し、File > New > Projectをクリックします。
    2. Maven Archetypeを選択してProject情報を入力し、[Create] ボタンをクリックしてプロジェクトを作成します。
      java-1_ja.png

    pom.xmlファイルの修正

    プロジェクトの従属性、Javaバージョン、パッケージングメソッドを定義するpom.xmlファイルを修正します。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <groupId>org.example</groupId>
        <artifactId>maventest</artifactId>
        <version>1.0-SNAPSHOT</version>
        <url>http://maven.apache.org</url>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <!-- Apache Kafka version in Cloud Data Streaming Service -->
                <version>2.4.0</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.21</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.3</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>KafkaMain</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
    
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    参考

    ユーザー環境によってpom.xmlファイルが異なる場合があります。

    KafkaMain.javaの作成

    Javaアプリケーションを実行する際にargumentでproduce/consumeの有無、Topic、Broker listsを伝達します。

    public class KafkaMain {
        public static void main(String[] args) throws IOException {
            String topicName = args[1];
            String brokers = args[2];
    
            switch(args[0]){
                case "produce":
                    Producer.produce(brokers, topicName);
                    break;
                case "consume":
                    Consumer.consume(brokers, topicName);
                    break;
                default:
                    System.out.println("Wrong arguments");
                    break;
            }
            System.exit(0);
        }
    }
    

    Producer.javaの作成

    Producer.javaファイルを作成します。 0~99の数字を転送する例は以下のとおりです。

    public class Producer {
        public static void produce(String brokers, String topicName) throws IOException {
            // Create Producer
            KafkaProducer<String, String> producer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<>(properties);
    
            for(int i=0;i<100;i++){
                ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
                producer.send(record);
            }
            producer.close();
        }
    }
    

    Consumer.javaの作成

    Consumer.javaファイルを作成します。

    public class Consumer {
        public static int consume(String brokers, String topicName) {
            // Create Consumer
            KafkaConsumer<String, String> consumer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "consumer_group");
            properties.setProperty("auto.offset.reset", "earliest");
    
            consumer = new KafkaConsumer<>(properties);
    
            consumer.subscribe(Arrays.asList(topicName));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                    System.out.println(record.value());
                }
            }
        }
    }
    

    jarファイルビルドと実行

    1. 作成したJava codeをGitに保存し、VMでGit cloneコマンドを入力して、このcodeをダウンロードします。
      Git clone自体のGit Repository
      
    2. このJavaアプリケーションをビルドするには、Mavenをインストールします。
      yum install maven -y
      
    3. ダウンロードしたJava codeがあるフォルダに移動し、jarファイルをビルドします。
      jarファイルをビルドすると、targetフォルダ内にjarファイルが作成されます。
      cd kafkatest
      mvn clean package
      
    4. targetフォルダに移動し、jarファイルを実行します。
      cd target
      
      # データを転送する
      java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
      # [topic]にCMAKで作成したTopicを入力します。
      # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。
      例) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
      
      
      # データを照会する
      java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
      # [topic]にCMAKで作成したTopicを入力します。
      # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。
      例) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
      

    Pythonを活用してデータを転送

    このガイドでは、Python 2.7.5バージョンを基準に説明します。

    PythonでKafkaを活用するには、kafka-python packageをインストールします。

    # PIP のインストール
    curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
    python get-pip.py
    
    # kafka-python package のインストール
    pip install kafka-python
    

    KafkaMain.pyの作成

    KafkaMain.pyファイルを作成します。

    import sys
    from kafka import KafkaProducer, KafkaConsumer
    from json import dumps
    import time
    
    
    def produce(topicName, brokerLists):
            producer = KafkaProducer(bootstrap_servers=brokerLists,
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
            for i in range(100):
                    producer.send(topicName, i)
    
    def consume(topicName, brokerLists):
            consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                            group_id="test")
    
            for msg in consumer:
                   print(msg)
    
    
    action=sys.argv[1]
    topicName=sys.argv[2]
    brokerLists=sys.argv[3].split(',')
    
    if action == 'produce':
        produce(topicName, brokerLists)
    elif action == 'consume':
        consume(topicName, brokerLists)
    else:
        print('wrong arguments')
    

    KafkaMain.pyファイルを実行してproduceを実行

    KafkaMain.pyファイルを実行します。 0~99の数字を転送する例です。

    python KafkaMain.py produce [topic] [broker.list]
    # [topic]にCMAKで作成したTopicを入力します。
    # [broker.list]に先に確認したBrokerノード情報のPlainTextコピーを入力します。
    # 例) python KafkaMain.py produce test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
    

    KafkaMain.pyファイルを実行してconsumeを実行

    KafkaMain.pyファイルを実行します。

    python KafkaMain.py consume [topic] [broker.list]
    # [topic]にCMAKで作成したTopicを入力します。
    # [broker.list]に先に確認したBrokerノード情報のPlainTextコピーを入力します。
    # 例) python KafkaMain.py consume test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
    

    通信区間の暗号化

    Apache KafkaクライアントとApache KafkaのBrokerノード間の通信区間を暗号化する方法を説明します。

    全体プロセスは以下のとおりです。 1~3の過程はクラスタ作成時に自動で実行されます。

    1. マネージャーノードで自体証明書を作成します。
    2. 各Brokerノードで証明書を作成し、証明書署名リクエストを作成します。
    3. 証明書署名リクエストに対してマネージャーノードが署名します。
    4. クライアントでは証明書をダウンロードし、証明書に関する情報を持つTrustStoreを作成します。
    5. 暗号化通信に関する設定ファイルを作成します。
    6. hostsファイルを修正します。

    証明書のダウンロード

    証明書をダウンロードする方法は以下のとおりです。

    1. NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
    2. クラスタの詳細情報で証明書管理項目の [ダウンロード] ボタンをクリックします。
      cdss-2-31_ja.png
    3. ポップアップが表示されたら、[確認] ボタンをクリックします。
      • 証明書がダウンロードされます。
    4. ダウンロードした証明書ファイルをProducer、Consumer VMにコピーします。
      • Producer、Consumer VMの/rootパスにca-certという名前で保存します。

    Truststoreの作成

    証明書情報を保存するTrustStoreを作成する方法は、以下のとおりです。

    1. Truststoreを作成するには、以下のコマンドを入力します。
      keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
      
    2. keystoreのパスワードを入力します。
      cdss-2-36_ja.png
      • Enter keystore password:パスワードを入力
      • Re-enter new password:パスワードを再入力
    3. Trust this certificate? [no]:メッセージが表示されたら、「yes」を入力します。
    4. lsコマンドを入力してkafka.client.truststore.jksファイルが作成されたかどうか確認します。

    暗号化設定ファイルの作成

    client-auth.propertiesファイルを以下のように作成します。
    [password]にはkafka.client.truststore.jksファイル作成時に設定したパスワードを入力します。

    # /root/kafka_2.12-2.4.0フォルダに暗号化設定ファイルを作成します。
    cd kafka_2.12-2.4.0
    vi client-auth.properties
    
    security.protocol=SSL
    ssl.truststore.location=/root/kafka.client.truststore.jks
    ssl.truststore.password=[password]
    

    cdss-2-37_ja.png

    hostsファイルの修正

    hostファイルは、LinuxでDNSサーバより先にホスト名をIPに変換するファイルです。 暗号化通信を実行するには、hostsファイル(/etc/hosts)を修正する必要があります。

    /etc/hostsファイルを実行します。

    vi /etc/hosts
    

    Brokerノード情報のhostsファイル情報のコピーを追加します。

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.2.24 yeakafka2-d-2am
    192.168.2.25 yeakafka2-d-2an
    192.168.2.26 yeakafka2-d-2ao
    

    cdss-2-38_ja.png

    通信区間を暗号化してデータ転送

    Producer VMで以下のようなコマンドを実行します。
    転送するメッセージを入力すると、ブローカーノードにこのメッセージが保存されます。 終了するには、[Ctrl] + [C] キーを押します。

    ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
    # [broker.list]に先に確認したブローカーノード情報のTLSコピーを入力します。
    # [topic]にCMAKで作成したTopicを入力します。
    # 例) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093、yeakafka2-d-2an:9093、yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
    

    cdss-2-40_ja.png

    通信区間を暗号化して保存されたデータの照会

    Consumer VMで以下のようなコマンドを実行します。

    cd kafka_2.12-2.4.0
    ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
    # [bootstrap.server]に先に確認したブローカーノード情報のTLSコピーを入力します。
    # [topic]に前段階のProducer VMで入力したTopicを入力します。
    # 例) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093、yeakafka2-d-2an:9093、yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
    

    cdss-2-41_ja.png

    Kafka Connectでデータパイプラインを構築

    Kafka Connectでデータパイプラインを構築する方法は、Kafka Connectでデータパイプライン構築をご参照ください。


    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.