VPC環境で利用できます。
CMAKにアクセスして Topicを作成し、Topicにデータを保存する方法、Topicに保存されたデータを Cloud Data Streaming Serviceクラスタに転送する方法、および Apache Kafkaクライアントと Apache Kafkaの Brokerノード間の通信区間を暗号化する方法を説明します。
CMAKアクセス
CMAKは Apache Kafkaクラスタを管理するプラットフォームです。CMAKにアクセスして Topicを作成し、Topicの Partition数の変更や Topicのデータ保管周期を変更できます。
CMAKにアクセスする前に、Publicドメインを有効にする必要があります。
CMAKにアクセスする方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。 - クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスドメインの設定変更ボタンをクリックします。
- ポップアップが表示されたら情報を確認し、 [確認] ボタンをクリックします。
- Publicドメインが有効になります。
- クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスをクリックします。
- CMAKアクセスメニューが有効になっていない場合、Publicドメインは無効の状態です。 1~3の手続きをもう一度実行してください。
- CMAKにアクセスするための事前タスク画面が表示されたら、CMAKに移動するボタンをクリックします。

- ログインポップアップが表示されたら、クラスタを作成する際に設定したユーザー IDとパスワードを入力します。
- IDを忘れた場合、クラスタ管理 > CMAKアクセスパスワードの初期化をクリックすると IDを確認できます。
- クラスタアカウント名をクリックします。

- Cluster Summaryで Topicと Brokerノードに関する情報を確認します。

-
Topicに関する情報

-
Brokerノードに関する情報

Topic作成
- CMAKページにアクセスします。
- CMAKページで Topic > Createメニューをクリックします。

- トピック情報を入力します。
- Topic: Topicの名前を入力
- Partitions: Topicの Partitions数を入力
- Replication Factor: Topicの Replication数を入力
- [Create] ボタンをクリックします。
詳細設定値についての説明は、Apache Kafka Documentationをご参照ください。
- Topicの作成後は、Partitions数を増やすことのみ可能であり、既に追加済みの Partitions数を減らすことはできません。
- データを消費する際に順序を保障する必要がある場合、Partitions数は1に設定する必要があります。ただし、Partitionsが1つの場合、1つのブローカーノードにすべてのデータが保存されるため、ブローカーノードのディスク使用量管理にご注意ください。
- Topicの作成後、Replication数は変更できません。安定したサービス運用のために、最低2つ以上の Replicationを適用することをお勧めします。
Topic情報の確認

| 領域 | 説明 |
|---|---|
| ① Topic Summary | Topicの情報確認 |
| ② Operations | Topic削除、Partition追加、再分配、設定変更 |
| ③ Partitions by Broker | 各ブローカーノードに対して作成された Partition情報を確認 |
| ④ Partition Information | 各 Partitionの Leaderと Replicationがどのブローカーノードに位置しているのかを確認 |
Topicのデータ保管周期変更
Topicのデータ保管周期を調整する方法は、次の通りです。
- CMAKアクセスを参照して CMAKにアクセスします。
- クラスタ名をクリックします。
- Topic > Listをクリックします。

- Topicリストからデータの保管周期を変更する Topicの名前をクリックします。
- Operations > [Update Config] ボタンをクリックします。

- retention.ms値を変更します。
- 1時間に設定する場合、3600000値を入力します。
- n時間に設定する場合、3600000 * nの結果値を入力します。
- [Update Config] ボタンをクリックします。
Topicの Partition数変更
CMAKで作成された Topicの Partition数は増やすことができます。Partitionは追加のみ可能で、過去に割り当てられた Partitionを削除することはできません。
Topicの Partition数を増やす方法は、次の通りです。
- CMAKアクセスを参照して CMAKにアクセスします。
- クラスタ名をクリックします。
- Topic > Listをクリックします。

- Topicリストからデータの保管周期を変更する Topicの名前をクリックします。
- Operations > [Add Partitions] ボタンをクリックします。

- Partitions項目に Partition数を入力します。
- 入力された数より大きい数のみ入力できます。

- 入力された数より大きい数のみ入力できます。
- [Add Partition] ボタンをクリックします。
Producer/Consumerの実装
以下のような構造で Producer VM、Consumer VMを作成し、Topicにデータを保存して転送する方法を説明します。
Apache Kafka、Java、Pythonを活用してデータを送信し保存する方法を説明します。

本ガイドでは、CentOS 7.3を基準に説明します。
Producer VM、Consumer VM作成
Producer VMと Consumer VMを作成する方法は、次の通りです。
- Server を開始するを参照してサーバを作成します。
- サーバ作成時には、Cloud Data Streaming Serviceクラスタを作成した VPCと同じ VPCを選択します。

- Subnetは、Public Subnetを選択します。
- 当該 VMにはパブリック IPアドレスを割り当てることでアクセスできます。
Javaのインストール
以下のコマンドを入力して Javaをインストールします。
yum install java -y
Apache Kafkaのインストール
以下のコマンドを入力して Apache Kafkaをインストールします。
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 圧縮ファイルを解凍します。
tar -zxvf kafka_2.12-2.4.0.tgz
Brokerノード情報の確認
Brokerノード情報を確認する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。 - クラスタの詳細情報で Brokerノード情報の [詳細を見る] ボタンをクリックします。

- Brokerノード情報画面が表示されたら、情報を確認します。
- PlainText: Brokerノードと暗号化なしで通信するための情報
- TLS: Brokerノードと暗号化通信するための情報
- hostsファイル情報: 暗号化通信に利用される hostsファイルの変更時に必要な情報
Brokerノードの ACG設定
Brokerノードの ACGルールを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。 - クラスタの詳細情報で Brokerノード ACGの
をクリックします。 - ACGリストから Brokerノードの ACGを選択し、 [設定] ボタンをクリックします。
- ACGルールを入力し、 [追加] ボタンをクリックします。
- プロトコル: TCP
- アクセスソース: Producer VM、Consumer VMのプライベート IPアドレスを入力
- 許可ポート: 9092-9093を入力
- ACGに関するメモを入力
- ルールが追加されているか確認し、 [適用] ボタンをクリックします。
- 当該ルールが ACGに適用されます。
Apache Kafkaを活用したデータ転送
-
前のステップで作成した Producer VMにアクセスし、以下のコマンドを入力します。
cd kafka_2.12-2.4.0 ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。 # [topic]に CMAKで作成した Topicを入力します。 # 例) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
-
転送するメッセージを入力します。
- Brokerノードに当該メッセージが保存されます。
- 終了する場合は、[Ctrl + C]キーを押します。

-
前のステップで作成した Consumer VMにアクセスし、以下のコマンドを実行します。
--from-beginningコマンドを使用する場合、この Topicに関するデータを最初からすべて照会します。--from-beginningコマンドを使用しない場合、データを照会した瞬間から入力されるデータのみ照会します。
cd kafka_2.12-2.4.0 ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning # [bootstrap.server]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。 # [topic]に前のステップの Producer VMで入力した Topicを入力します。 # 例) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
Javaを活用したデータ転送
Javaを活用してデータを転送する方法を説明します。
本ガイドでは、IntelliJ IDEAを使用することを基準に説明します。
プロジェクト作成
プロジェクトを作成する方法は、次の通りです。
- IntelliJ IDEAを実行し、File > New > Projectをクリックします。
- Maven Archetypeを選択して Project情報を入力し、 [Create] ボタンをクリックしてプロジェクトを作成します。

pom.xmlファイル変更
プロジェクトの依存関係、Javaバージョン、パッケージングメソッドを定義する pom.xml ファイルを変更します。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.example</groupId>
<artifactId>maventest</artifactId>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- Apache Kafka version in Cloud Data Streaming Service -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>KafkaMain</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
ユーザー環境によって pom.xmlファイルが異なる場合があります。
KafkaMain.java作成
Java Applicationを実行する際に argumentで produce/consumeの有無、Topic、Broker listsを転送します。
public class KafkaMain {
public static void main(String[] args) throws IOException {
String topicName = args[1];
String brokers = args[2];
switch(args[0]){
case "produce":
Producer.produce(brokers, topicName);
break;
case "consume":
Consumer.consume(brokers, topicName);
break;
default:
System.out.println("Wrong arguments");
break;
}
System.exit(0);
}
}
Producer.java作成
Producer.java ファイルを作成します。 0~99の数字を転送する例は、次の通りです。
public class Producer {
public static void produce(String brokers, String topicName) throws IOException {
// Create Producer
KafkaProducer<String, String> producer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
for(int i=0;i<100;i++){
ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
producer.send(record);
}
producer.close();
}
}
Consumer.java作成
Consumer.java ファイルを作成します。
public class Consumer {
public static int consume(String brokers, String topicName) {
// Create Consumer
KafkaConsumer<String, String> consumer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
System.out.println(record.value());
}
}
}
}
jarファイルのビルドと実行
- 作成した Java codeを gitに保存した後、VMで
git cloneコマンドを入力して当該コードをダウンロードします。git clone自体の git Repository - 当該 Java Applicationをビルドするには、Mavenをインストールします。
yum install maven -y - ダウンロードした Java codeがあるフォルダに移動し、jarファイルをビルドします。
jarファイルをビルドすると、targetフォルダ内に jarファイルが作成されます。cd kafkatest mvn clean package - targetフォルダに移動し、jarファイルを実行します。
cd target # データを転送する java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list] # [topic]に CMAKで作成した Topicを入力します。 # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。 例) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 # データを照会する java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list] # [topic]に CMAKで作成した Topicを入力します。 # [broker.list]に先ほど確認したブローカーノード情報の PlainTextレプリカを入力します。 例) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
Pythonを活用したデータ転送
本ガイドでは、Python 2.7.5バージョンを基準に説明します。
Pythonで Kafkaを活用するには、kafka-python packageをインストールします。
# pipインストール
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
# kafka-python packageインストール
pip install kafka-python
KafkaMain.py作成
KafkaMain.py ファイルを作成します
import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time
def produce(topicName, brokerLists):
producer = KafkaProducer(bootstrap_servers=brokerLists,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(100):
producer.send(topicName, i)
def consume(topicName, brokerLists):
consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
group_id="test")
for msg in consumer:
print(msg)
action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')
if action == 'produce':
produce(topicName, brokerLists)
elif action == 'consume':
consume(topicName, brokerLists)
else:
print('wrong arguments')
KafkaMain.pyファイルを実行して produceを実行
KafkaMain.py ファイルを実行します。0~99の数字を転送する例です。
python KafkaMain.py produce [topic] [broker.list]
# [topic]に CMAKで作成した Topicを入力します。
# [broker.list]に先ほど確認した Brokerノード情報の PlainTextレプリカを入力します。
# 例) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
KafkaMain.pyファイルを実行して consumeを実行
KafkaMain.pyファイルを実行します。
python KafkaMain.py consume [topic] [broker.list]
# [topic]に CMAKで作成した Topicを入力します。
# [broker.list]に先ほど確認した Brokerノード情報の PlainTextレプリカを入力します。
# 例) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
通信区間の暗号化
Apache Kafkaクライアントと Apache Kafkaの Brokerノード間の通信区間を暗号化する方法です。
全プロセスは次の通りです。1~3のプロセスはクラスタ作成時に自動で実行されます。
- マネージャーノードで自体証明書を作成します。
- 各 Brokerノードで証明書を作成し、証明書署名リクエストを作成します。
- 証明書署名リクエストに対してマネージャーノードが署名します。
- クライアントでは証明書をダウンロードし、証明書に関する情報を持つ TrustStoreを作成します。
- 暗号化通信に関する設定ファイルを作成します。
- hostsファイルを変更します。
証明書ダウンロード
証明書をダウンロードする方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、
> Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。 - クラスタの詳細情報で証明書管理項目の [ダウンロード] ボタンをクリックします。

- ポップアップが表示されたら、 [確認] ボタンをクリックします。
- 証明書がダウンロードされます。
- ダウンロードした証明書ファイルを Producer、Consumer VMにコピーします。
- Producer、Consumer VMの
/rootパスに ca-certという名前で保存します。
- Producer、Consumer VMの
Truststoreの作成
証明書情報を保存する TrustStoreを作成する方法は、次の通りです。
- Truststoreを作成するには、以下のコマンドを入力します。
keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert - keystoreのパスワードを入力します。
- Enter keystore password: Passwordを入力
- Re-enter new password: Passwordを再入力
- Trust this certificate? [no]: メッセージが表示されたら、「yes」を入力します。
- lsコマンドを入力して
kafka.client.truststore.jksファイルが作成されたかどうか確認します。
暗号化設定ファイルの作成
client-auth.propertiesファイルを以下のように作成します。
[password]にはkafka.client.truststore.jksファイル作成時に設定したパスワードを入力します。
# /root/kafka_2.12-2.4.0フォルダに暗号化設定ファイルを作成します。
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]

hostsファイル変更
hostファイルは、Linuxで DNSサーバよりも先にホスト名を IPアドレスに変換するファイルです。暗号化通信を実行するには、hostsファイル(/etc/hosts)を変更する必要があります。
/etc/hosts ファイルを実行します。
vi /etc/hosts
Brokerノード情報の hostsファイル情報のコピーを追加します。
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao

通信区間を暗号化してデータ転送
Producer VMで以下のようなコマンドを実行します。
転送するメッセージを入力すると、ブローカーノードにこのメッセージが保存されます。終了するには、 [Ctrl] + [C] キーを押します。
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]に先ほど確認したブローカーノード情報の TLSレプリカを入力します。
# [topic]に CMAKで作成した Topicを入力します。
# 例) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties

通信区間を暗号化して保存されたデータを照会
Consumer VMで以下のようなコマンドを実行します。
cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]に先ほど確認したブローカーノード情報の TLSレプリカを入力します。
# [topic]に前のステップの Producer VMで入力した Topicを入力します。
# 例) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties

Kafka Connect によるデータパイプライン構築
Kafka Connectでデータパイプラインを構築する方法については、Kafka Connect によるデータパイプライン構築をご参照ください。