Cloud Data Streaming Service の活用

Prev Next

VPC環境で利用できます。

CMAKにアクセスして Topicを作成し、Topicにデータを保存する方法、Topicに保存されたデータを Cloud Data Streaming Serviceクラスタに転送する方法、および Apache Kafkaクライアントと Apache Kafkaの Brokerノード間の通信区間を暗号化する方法を説明します。

CMAKアクセス

CMAKは Apache Kafkaクラスタを管理するプラットフォームです。CMAKにアクセスして Topicを作成し、Topicの Partition数の変更や Topicのデータ保管周期を変更できます。

参考

CMAKにアクセスする前に、Publicドメインを有効にする必要があります。

CMAKにアクセスする方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
  2. クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスドメインの設定変更ボタンをクリックします。
  3. ポップアップが表示されたら情報を確認し、 [確認] ボタンをクリックします。
    • Publicドメインが有効になります。
  4. クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスをクリックします。
    • CMAKアクセスメニューが有効になっていない場合、Publicドメインは無効の状態です。 1~3の手続きをもう一度実行してください。
  5. CMAKにアクセスするための事前タスク画面が表示されたら、CMAKに移動するボタンをクリックします。
    cdss-cmak_cmak01_ko
  6. ログインポップアップが表示されたら、クラスタを作成する際に設定したユーザー IDとパスワードを入力します。
    • IDを忘れた場合、クラスタ管理 > CMAKアクセスパスワードの初期化をクリックすると IDを確認できます。
  7. クラスタアカウント名をクリックします。
    cdss-cmak_cmak02_ko
  8. Cluster Summaryで Topicと Brokerノードに関する情報を確認します。
    cdss-cmak_cmak03_ko
  • Topicに関する情報
    cmak-04_ko

  • Brokerノードに関する情報
    cmak-05_ko

Topic作成

  1. CMAKページにアクセスします。
  2. CMAKページで Topic > Createメニューをクリックします。
    cdss-cmak_topicadd01_ko
  3. トピック情報を入力します。
    cdss-cmak_topicadd02_ko
    • Topic: Topicの名前を入力
    • Partitions: Topicの Partitions数を入力
    • Replication Factor: Topicの Replication数を入力
  4. [Create] ボタンをクリックします。
参考

詳細設定値についての説明は、Apache Kafka Documentationをご参照ください。

注意
  • Topicの作成後は、Partitions数を増やすことのみ可能であり、既に追加済みの Partitions数を減らすことはできません。
  • データを消費する際に順序を保障する必要がある場合、Partitions数は1に設定する必要があります。ただし、Partitionsが1つの場合、1つのブローカーノードにすべてのデータが保存されるため、ブローカーノードのディスク使用量管理にご注意ください。
  • Topicの作成後、Replication数は変更できません。安定したサービス運用のために、最低2つ以上の Replicationを適用することをお勧めします。

Topic情報の確認

cdss-cmak_topicinfo

領域 説明
Topic Summary Topicの情報確認
Operations Topic削除、Partition追加、再分配、設定変更
Partitions by Broker 各ブローカーノードに対して作成された Partition情報を確認
Partition Information 各 Partitionの Leaderと Replicationがどのブローカーノードに位置しているのかを確認

Topicのデータ保管周期変更

Topicのデータ保管周期を調整する方法は、次の通りです。

  1. CMAKアクセスを参照して CMAKにアクセスします。
  2. クラスタ名をクリックします。
  3. Topic > Listをクリックします。
    cdss-cmak_topiclist_ko
  4. Topicリストからデータの保管周期を変更する Topicの名前をクリックします。
  5. Operations > [Update Config] ボタンをクリックします。
    cdss-cmak_topicconfig01_ko
  6. retention.ms値を変更します。
    cdss-cmak_topicconfig02_ko
    • 1時間に設定する場合、3600000値を入力します。
    • n時間に設定する場合、3600000 * nの結果値を入力します。
  7. [Update Config] ボタンをクリックします。

Topicの Partition数変更

CMAKで作成された Topicの Partition数は増やすことができます。Partitionは追加のみ可能で、過去に割り当てられた Partitionを削除することはできません。

Topicの Partition数を増やす方法は、次の通りです。

  1. CMAKアクセスを参照して CMAKにアクセスします。
  2. クラスタ名をクリックします。
  3. Topic > Listをクリックします。
    cdss-cmak_topiclist_ko
  4. Topicリストからデータの保管周期を変更する Topicの名前をクリックします。
  5. Operations > [Add Partitions] ボタンをクリックします。
    cdss-cmak_parition01_ko
  6. Partitions項目に Partition数を入力します。
    • 入力された数より大きい数のみ入力できます。
      cdss-cmak_parition02_ko
  7. [Add Partition] ボタンをクリックします。

Producer/Consumerの実装

以下のような構造で Producer VM、Consumer VMを作成し、Topicにデータを保存して転送する方法を説明します。

Apache Kafka、Java、Pythonを活用してデータを送信し保存する方法を説明します。

cdss-2-23

参考

本ガイドでは、CentOS 7.3を基準に説明します。

Producer VM、Consumer VM作成

Producer VMと Consumer VMを作成する方法は、次の通りです。

  1. Server を開始するを参照してサーバを作成します。
  2. サーバ作成時には、Cloud Data Streaming Serviceクラスタを作成した VPCと同じ VPCを選択します。
    cdss-cmak_server_ko
  3. Subnetは、Public Subnetを選択します。
    • 当該 VMにはパブリック IPアドレスを割り当てることでアクセスできます。

Javaのインストール

以下のコマンドを入力して Javaをインストールします。

yum install java -y

Apache Kafkaのインストール

以下のコマンドを入力して Apache Kafkaをインストールします。

wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 圧縮ファイルを解凍します。
tar -zxvf kafka_2.12-2.4.0.tgz

Brokerノード情報の確認

Brokerノード情報を確認する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
  2. クラスタの詳細情報で Brokerノード情報の [詳細を見る] ボタンをクリックします。
    cdss-cmak_broker01_ko
  3. Brokerノード情報画面が表示されたら、情報を確認します。
    cdss-cmak_broker02_ko
    • PlainText: Brokerノードと暗号化なしで通信するための情報
    • TLS: Brokerノードと暗号化通信するための情報
    • hostsファイル情報: 暗号化通信に利用される hostsファイルの変更時に必要な情報

Brokerノードの ACG設定

Brokerノードの ACGルールを設定する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
  2. クラスタの詳細情報で Brokerノード ACGの cdss-cluster-popupiconをクリックします。
  3. ACGリストから Brokerノードの ACGを選択し、 [設定] ボタンをクリックします。
  4. ACGルールを入力し、 [追加] ボタンをクリックします。
    cdss-cmak_acg01_ko
    • プロトコル: TCP
    • アクセスソース: Producer VM、Consumer VMのプライベート IPアドレスを入力
    • 許可ポート: 9092-9093を入力
    • ACGに関するメモを入力
  5. ルールが追加されているか確認し、 [適用] ボタンをクリックします。
    • 当該ルールが ACGに適用されます。

Apache Kafkaを活用したデータ転送

  1. 前のステップで作成した Producer VMにアクセスし、以下のコマンドを入力します。

    cd kafka_2.12-2.4.0
    ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
    # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。
    # [topic]に CMAKで作成した Topicを入力します。
    # 例) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
    

    cdss-2-29_ko

  2. 転送するメッセージを入力します。

    • Brokerノードに当該メッセージが保存されます。
    • 終了する場合は、[Ctrl + C]キーを押します。
      cdss-2-30_ko
  3. 前のステップで作成した Consumer VMにアクセスし、以下のコマンドを実行します。

    • --from-beginning コマンドを使用する場合、この Topicに関するデータを最初からすべて照会します。
    • --from-beginning コマンドを使用しない場合、データを照会した瞬間から入力されるデータのみ照会します。
    cd kafka_2.12-2.4.0
    ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
    # [bootstrap.server]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。
    # [topic]に前のステップの Producer VMで入力した Topicを入力します。
    # 例) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
    

Javaを活用したデータ転送

Javaを活用してデータを転送する方法を説明します。

参考

本ガイドでは、IntelliJ IDEAを使用することを基準に説明します。

プロジェクト作成

プロジェクトを作成する方法は、次の通りです。

  1. IntelliJ IDEAを実行し、File > New > Projectをクリックします。
  2. Maven Archetypeを選択して Project情報を入力し、 [Create] ボタンをクリックしてプロジェクトを作成します。
    java-1_ko

pom.xmlファイル変更

プロジェクトの依存関係、Javaバージョン、パッケージングメソッドを定義する pom.xml ファイルを変更します。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>jar</packaging>
    <groupId>org.example</groupId>
    <artifactId>maventest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <url>http://maven.apache.org</url>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <!-- Apache Kafka version in Cloud Data Streaming Service -->
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>KafkaMain</mainClass>
                        </transformer>
                    </transformers>
                </configuration>

                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
参考

ユーザー環境によって pom.xmlファイルが異なる場合があります。

KafkaMain.java作成

Java Applicationを実行する際に argumentで produce/consumeの有無、Topic、Broker listsを転送します。

public class KafkaMain {
    public static void main(String[] args) throws IOException {
        String topicName = args[1];
        String brokers = args[2];

        switch(args[0]){
            case "produce":
                Producer.produce(brokers, topicName);
                break;
            case "consume":
                Consumer.consume(brokers, topicName);
                break;
            default:
                System.out.println("Wrong arguments");
                break;
        }
        System.exit(0);
    }
}

Producer.java作成

Producer.java ファイルを作成します。 0~99の数字を転送する例は、次の通りです。

public class Producer {
    public static void produce(String brokers, String topicName) throws IOException {
        // Create Producer
        KafkaProducer<String, String> producer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<>(properties);

        for(int i=0;i<100;i++){
            ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
            producer.send(record);
        }
        producer.close();
    }
}

Consumer.java作成

Consumer.java ファイルを作成します。

public class Consumer {
    public static int consume(String brokers, String topicName) {
        // Create Consumer
        KafkaConsumer<String, String> consumer;
        // Configure
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", brokers);
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("group.id", "consumer_group");
        properties.setProperty("auto.offset.reset", "earliest");

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Arrays.asList(topicName));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                System.out.println(record.value());
            }
        }
    }
}

jarファイルのビルドと実行

  1. 作成した Java codeを gitに保存した後、VMで git cloneコマンドを入力して当該コードをダウンロードします。
    git clone自体の git Repository
    
  2. 当該 Java Applicationをビルドするには、Mavenをインストールします。
    yum install maven -y
    
  3. ダウンロードした Java codeがあるフォルダに移動し、jarファイルをビルドします。
    jarファイルをビルドすると、targetフォルダ内に jarファイルが作成されます。
    cd kafkatest
    mvn clean package
    
  4. targetフォルダに移動し、jarファイルを実行します。
    cd target
    
    # データを転送する
    java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
    # [topic]に CMAKで作成した Topicを入力します。
    # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。
    例) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    
    
    # データを照会する
    java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
    # [topic]に CMAKで作成した Topicを入力します。
    # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。
    例) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    

Pythonを活用したデータ転送

本ガイドでは、Python 2.7.5バージョンを基準に説明します。

Pythonで Kafkaを活用するには、kafka-python packageをインストールします。

# pipインストール
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py

# kafka-python packageインストール
pip install kafka-python

KafkaMain.py作成

KafkaMain.py ファイルを作成します

import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time


def produce(topicName, brokerLists):
        producer = KafkaProducer(bootstrap_servers=brokerLists,
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))
        for i in range(100):
                producer.send(topicName, i)

def consume(topicName, brokerLists):
        consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                        group_id="test")

        for msg in consumer:
               print(msg)


action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')

if action == 'produce':
    produce(topicName, brokerLists)
elif action == 'consume':
    consume(topicName, brokerLists)
else:
    print('wrong arguments')

KafkaMain.pyファイルを実行して produceを実行

KafkaMain.py ファイルを実行します。0~99の数字を転送する例です。

python KafkaMain.py produce [topic] [broker.list]
# [topic]に CMAKで作成した Topicを入力します。
# [broker.list]に先ほど確認した Brokerノード情報の PlainTextレプリカを入力します。
# 例) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

KafkaMain.pyファイルを実行して consumeを実行

KafkaMain.pyファイルを実行します。

python KafkaMain.py consume [topic] [broker.list]
# [topic]に CMAKで作成した Topicを入力します。
# [broker.list]に先ほど確認した Brokerノード情報の PlainTextレプリカを入力します。
# 例) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092

通信区間の暗号化

Apache Kafkaクライアントと Apache Kafkaの Brokerノード間の通信区間を暗号化する方法です。

全プロセスは次の通りです。1~3のプロセスはクラスタ作成時に自動で実行されます。

  1. マネージャーノードで自体証明書を作成します。
  2. 各 Brokerノードで証明書を作成し、証明書署名リクエストを作成します。
  3. 証明書署名リクエストに対してマネージャーノードが署名します。
  4. クライアントでは証明書をダウンロードし、証明書に関する情報を持つ TrustStoreを作成します。
  5. 暗号化通信に関する設定ファイルを作成します。
  6. hostsファイルを変更します。

証明書ダウンロード

証明書をダウンロードする方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
  2. クラスタの詳細情報で証明書管理項目の [ダウンロード] ボタンをクリックします。
    cdss-2-31_ko
  3. ポップアップが表示されたら、 [確認] ボタンをクリックします。
    • 証明書がダウンロードされます。
  4. ダウンロードした証明書ファイルを Producer、Consumer VMにコピーします。
    • Producer、Consumer VMの /root パスに ca-certという名前で保存します。

Truststoreの作成

証明書情報を保存する TrustStoreを作成する方法は、次の通りです。

  1. Truststoreを作成するには、以下のコマンドを入力します。
    keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
    
  2. keystoreのパスワードを入力します。
    cdss-2-36_ko
    • Enter keystore password: Passwordを入力
    • Re-enter new password: Passwordを再入力
  3. Trust this certificate? [no]: メッセージが表示されたら、「yes」を入力します。
  4. lsコマンドを入力してkafka.client.truststore.jksファイルが作成されたかどうか確認します。

暗号化設定ファイルの作成

client-auth.propertiesファイルを以下のように作成します。
[password]にはkafka.client.truststore.jksファイル作成時に設定したパスワードを入力します。

# /root/kafka_2.12-2.4.0フォルダに暗号化設定ファイルを作成します。
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]

cdss-2-37_ko

hostsファイル変更

hostファイルは、Linuxで DNSサーバよりも先にホスト名を IPアドレスに変換するファイルです。暗号化通信を実行するには、hostsファイル(/etc/hosts)を変更する必要があります。

/etc/hosts ファイルを実行します。

vi /etc/hosts

Brokerノード情報の hostsファイル情報のコピーを追加します。

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao

cdss-2-38_ko

通信区間を暗号化してデータ転送

Producer VMで以下のようなコマンドを実行します。
転送するメッセージを入力すると、ブローカーノードにこのメッセージが保存されます。終了するには、 [Ctrl] + [C] キーを押します。

./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]に先ほど確認したブローカーノード情報の TLSレプリカを入力します。
# [topic]に CMAKで作成した Topicを入力します。
# 例) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties

cdss-2-40_ko

通信区間を暗号化して保存されたデータを照会

Consumer VMで以下のようなコマンドを実行します。

cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]に先ほど確認したブローカーノード情報の TLSレプリカを入力します。
# [topic]に前のステップの Producer VMで入力した Topicを入力します。
# 例) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties

cdss-2-41_ko

Kafka Connect によるデータパイプライン構築

Kafka Connectでデータパイプラインを構築する方法については、Kafka Connect によるデータパイプライン構築をご参照ください。