- 印刷する
- PDF
Cloud Data Streaming Serviceの活用
- 印刷する
- PDF
VPC環境で利用できます。
CMAKにアクセスしてTopicを作成し、Topicにデータを保存する方法、Topicに保存されたデータをCloud Data Streaming Service Clusterに転送する方法、Apache KafkaクライアントとApache KafkaのBrokerノード間の通信区間を暗号化する方法を説明します。
CMAKにアクセス
CMAKはApache Kafkaクラスタを管理するプラットフォームです。 CMAKにアクセスしてTopicを作成し、TopicのPartition数の変更やTopicのデータ保管周期を変更できます。
CMAKにアクセスする前に、Publicドメインを有効化する必要があります。
CMAKにアクセスする方法は以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
- クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKアクセスドメインの設定変更ボタンをクリックします。
- ポップアップが表示されたら情報を確認し、[確認] ボタンをクリックします。
- Publicドメインが有効になります。
- クラスタのチェックボックスをクリックし、クラスタ管理 > CMAKにアクセスをクリックします。
- CMAKアクセスメニューが有効になっていない場合、Publicドメインは無効の状態です。 1~3の手続きをもう一度実行してください。
- CMAKにアクセスするための事前ジョブ画面が表示されたら、CMAKに移動するボタンをクリックします。
- ログインポップアップが表示されたら、クラスタを作成する際に設定したユーザーIDとパスワードを入力します。
- IDを忘れた場合、クラスタ管理 > CMAKアクセスパスワードの初期化をクリックすると、IDを確認できます。
- クラスタアカウント名をクリックします。
- Cluster SummaryでTopicとBrokerノードに関する情報を確認します。
Topicに関する情報
Brokerノードに関する情報
Topicの作成
- CMAKページにアクセスします。
- CMAKページでTopic > Createメニューをクリックします。
- トピック情報を入力します。
- Topic:Topicの名前を入力
- Partitions:TopicのPartitions数を入力
- Replication Factor:TopicのReplication数を入力
- [Create] ボタンをクリックします。
詳細設定値についての説明は、Apache Kafka Documentationをご参照ください。
- Topicの作成後は、Partitions数を増やすことのみ可能であり、既に追加済みのPartitions数を減らすことはできません。
- データを消費する際に順序を保障する必要がある場合、Partitions数は1に設定する必要があります。 ただし、Partitionsが1つの場合、1つのブローカーノードにすべてのデータが保存されるため、ブローカーノードのディスク使用量管理に注意してください。
- Topicの作成後、Replication数は変更できません。 安定したサービス運用のために、最低2つ以上のReplicationを適用することをお勧めします。
Topic情報の確認
① Topic Summary:Topicの情報を確認
② Operations:Topic削除、Partition追加、再分配、設定変更
③ Partitions by Broker:各ブローカーノードに関して作成されたPartition情報を確認
④ Partition Information:各PartitionのLeaderとReplicationがどんなブローカーノードに位置しているのかを確認
Topicのデータ保管周期の変更
Topicのデータ保管周期を調整する方法は以下のとおりです。
- CMAKにアクセスを参考にしてCMAKにアクセスします。
- クラスタ名をクリックします。
- Topic > Listをクリックします。
- Topicリストでデータの保管周期を変更するTopicの名前をクリックします。
- Operations > [Update Config] ボタンをクリックします。
- retention.ms値を修正します。
- 1時間に設定する場合、3600000値を入力します。
- n時間に設定する場合、3600000*nの結果値を入力します。
- [Update Config] ボタンをクリックします。
TopicのPartition数の変更
CMAKで作成されたTopicのPartition数は増やすことができます。 Partitionは追加のみ可能で、過去に割り当てられたPartitionを削除することはできません。
TopicのPartition数を増やす方法は以下のとおりです。
- CMAKにアクセスを参考にしてCMAKにアクセスします。
- クラスタ名をクリックします。
- Topic > Listをクリックします。
- Topicリストでデータの保管周期を変更するTopicの名前をクリックします。
- Operations > [Add Partitions] ボタンをクリックします。
- Partitions項目にPartition数を入力します。
- 入力された数より大きい数のみ入力できます。
- 入力された数より大きい数のみ入力できます。
- [Add Paritions] ボタンをクリックします。
Producer/Consumerの実装
以下のような構造でProducer VM、Consumer VMを作成し、Topicにデータを保存して転送する方法を説明します。
Apache Kafka、Java、Pythonを活用してデータを転送し、保存する方法を説明します。
このガイドでは、CentOS 7.3を基準に説明します。
Producer VM、Consumer VMの作成
Producer VMとConsumer VMを作成する方法は、以下のとおりです。
- Serverを開始するを参照してサーバを作成します。
- サーバ作成時にCloud Data Streaming Serviceクラスタを作成したVPCと同じVPCを選択します。
- Subnetは、Public Subnetを選択します。
- このVMにグローバルIPを割り当てるとアクセスできます。
Javaのインストール
以下のコマンドを入力してJavaをインストールします。
yum install java -y
Apache Kafkaのインストール
以下のコマンドを入力してApache Kafkaをインストールします。
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 圧縮を展開します。
tar -zxvf kafka_2.12-2.4.0.tgz
Brokerノード情報の確認
Brokerノード情報を確認する方法は以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
- クラスタの詳細情報でBrokerノード情報の [詳細を見る] ボタンをクリックします。
- Brokerノード情報画面が表示されたら、情報を確認します。
- PlainText:Brokerノードと暗号化なしに通信するための情報
- TLS:Brokerノードと暗号化通信するための情報
- hostsファイル情報:暗号化通信に利用されるhostsファイルの修正時に必要な情報
BrokerノードのACG設定
BrokerノードのACGルールを設定する方法は以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
- クラスタの詳細情報でBrokerノードACGのをクリックします。
- ACGリストでBrokerノードのACGを選択し、[設定] ボタンをクリックします。
- ACGルールを入力し、[追加] ボタンをクリックします。
- プロトコル:TCP
- アクセスソース:Producer VM、Consumer VMのプライベートIPを入力
- 許可ポート:9092-9093を入力
- ACGに関するメモを入力
- ルールが追加されているか確認し、[適用] ボタンをクリックします。
- 当該ルールがACGに適用されます。
Apache Kafkaを活用してデータを転送
前の段階で作成したProducer VMにアクセスし、以下のコマンドを入力します。
cd kafka_2.12-2.4.0 ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。 # [topic]にCMAKで作成したTopicを入力します。 # 例) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092 --topic test
転送するメッセージを入力します。
- Brokerノードに当該メッセージが保存されます。
- 終了する場合は、[Ctrl + C]キーを押します。
前の段階で作成したConsumer VMにアクセスし、以下のコマンドを実行します。
- --from-beginningコマンドを使用する場合、このTopicに関するデータを最初からすべて照会します。
- --from-beginningコマンドを使用しない場合、データを照会した瞬間から入力されるデータのみ照会します。
cd kafka_2.12-2.4.0 ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning # [bootstrap.server]に先に確認したブローカーノード情報のPlainTextコピーを入力します。 # [topic]に前段階のProducer VMで入力したTopicを入力します。 # 例) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092 --topic test --from-beginning
Javaを活用してデータを転送
Javaを活用してデータを転送する方法を説明します。
このガイドでは、IntelliJ IDEAを使用することを基準に説明します。
プロジェクトの作成
プロジェクトを作成する方法は以下のとおりです。
- IntelliJ IDEAを実行し、File > New > Projectをクリックします。
- Maven Archetypeを選択してProject情報を入力し、[Create] ボタンをクリックしてプロジェクトを作成します。
pom.xmlファイルの修正
プロジェクトの従属性、Javaバージョン、パッケージングメソッドを定義するpom.xml
ファイルを修正します。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.example</groupId>
<artifactId>maventest</artifactId>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- Apache Kafka version in Cloud Data Streaming Service -->
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>KafkaMain</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
ユーザー環境によってpom.xmlファイルが異なる場合があります。
KafkaMain.javaの作成
Javaアプリケーションを実行する際にargumentでproduce/consumeの有無、Topic、Broker listsを伝達します。
public class KafkaMain {
public static void main(String[] args) throws IOException {
String topicName = args[1];
String brokers = args[2];
switch(args[0]){
case "produce":
Producer.produce(brokers, topicName);
break;
case "consume":
Consumer.consume(brokers, topicName);
break;
default:
System.out.println("Wrong arguments");
break;
}
System.exit(0);
}
}
Producer.javaの作成
Producer.java
ファイルを作成します。 0~99の数字を転送する例は以下のとおりです。
public class Producer {
public static void produce(String brokers, String topicName) throws IOException {
// Create Producer
KafkaProducer<String, String> producer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(properties);
for(int i=0;i<100;i++){
ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
producer.send(record);
}
producer.close();
}
}
Consumer.javaの作成
Consumer.java
ファイルを作成します。
public class Consumer {
public static int consume(String brokers, String topicName) {
// Create Consumer
KafkaConsumer<String, String> consumer;
// Configure
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "consumer_group");
properties.setProperty("auto.offset.reset", "earliest");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
System.out.println(record.value());
}
}
}
}
jarファイルビルドと実行
- 作成したJava codeをGitに保存し、VMでGit cloneコマンドを入力して、このcodeをダウンロードします。
Git clone自体のGit Repository
- このJavaアプリケーションをビルドするには、Mavenをインストールします。
yum install maven -y
- ダウンロードしたJava codeがあるフォルダに移動し、jarファイルをビルドします。
jarファイルをビルドすると、targetフォルダ内にjarファイルが作成されます。cd kafkatest mvn clean package
- targetフォルダに移動し、jarファイルを実行します。
cd target # データを転送する java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list] # [topic]にCMAKで作成したTopicを入力します。 # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。 例) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092 # データを照会する java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list] # [topic]にCMAKで作成したTopicを入力します。 # [broker.list]に先に確認したブローカーノード情報のPlainTextコピーを入力します。 例) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
Pythonを活用してデータを転送
このガイドでは、Python 2.7.5バージョンを基準に説明します。
PythonでKafkaを活用するには、kafka-python package
をインストールします。
# PIP のインストール
curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
# kafka-python package のインストール
pip install kafka-python
KafkaMain.pyの作成
KafkaMain.py
ファイルを作成します。
import sys
from kafka import KafkaProducer, KafkaConsumer
from json import dumps
import time
def produce(topicName, brokerLists):
producer = KafkaProducer(bootstrap_servers=brokerLists,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for i in range(100):
producer.send(topicName, i)
def consume(topicName, brokerLists):
consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
group_id="test")
for msg in consumer:
print(msg)
action=sys.argv[1]
topicName=sys.argv[2]
brokerLists=sys.argv[3].split(',')
if action == 'produce':
produce(topicName, brokerLists)
elif action == 'consume':
consume(topicName, brokerLists)
else:
print('wrong arguments')
KafkaMain.pyファイルを実行してproduceを実行
KafkaMain.py
ファイルを実行します。 0~99の数字を転送する例です。
python KafkaMain.py produce [topic] [broker.list]
# [topic]にCMAKで作成したTopicを入力します。
# [broker.list]に先に確認したBrokerノード情報のPlainTextコピーを入力します。
# 例) python KafkaMain.py produce test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
KafkaMain.pyファイルを実行してconsumeを実行
KafkaMain.py
ファイルを実行します。
python KafkaMain.py consume [topic] [broker.list]
# [topic]にCMAKで作成したTopicを入力します。
# [broker.list]に先に確認したBrokerノード情報のPlainTextコピーを入力します。
# 例) python KafkaMain.py consume test 192.168.2.24:9092、192.168.2.25:9092、192.168.2.26:9092
通信区間の暗号化
Apache KafkaクライアントとApache KafkaのBrokerノード間の通信区間を暗号化する方法を説明します。
全体プロセスは以下のとおりです。 1~3の過程はクラスタ作成時に自動で実行されます。
- マネージャーノードで自体証明書を作成します。
- 各Brokerノードで証明書を作成し、証明書署名リクエストを作成します。
- 証明書署名リクエストに対してマネージャーノードが署名します。
- クライアントでは証明書をダウンロードし、証明書に関する情報を持つTrustStoreを作成します。
- 暗号化通信に関する設定ファイルを作成します。
- hostsファイルを修正します。
証明書のダウンロード
証明書をダウンロードする方法は以下のとおりです。
- NAVERクラウドプラットフォームコンソールで、Services > Big Data & Analytics > Cloud Data Streaming Service > Clusterメニューを順にクリックします。
- クラスタの詳細情報で証明書管理項目の [ダウンロード] ボタンをクリックします。
- ポップアップが表示されたら、[確認] ボタンをクリックします。
- 証明書がダウンロードされます。
- ダウンロードした証明書ファイルをProducer、Consumer VMにコピーします。
- Producer、Consumer VMの
/root
パスにca-certという名前で保存します。
- Producer、Consumer VMの
Truststoreの作成
証明書情報を保存するTrustStoreを作成する方法は、以下のとおりです。
- Truststoreを作成するには、以下のコマンドを入力します。
keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
- keystoreのパスワードを入力します。
- Enter keystore password:パスワードを入力
- Re-enter new password:パスワードを再入力
- Trust this certificate? [no]:メッセージが表示されたら、「yes」を入力します。
- lsコマンドを入力して
kafka.client.truststore.jks
ファイルが作成されたかどうか確認します。
暗号化設定ファイルの作成
client-auth.properties
ファイルを以下のように作成します。
[password]にはkafka.client.truststore.jks
ファイル作成時に設定したパスワードを入力します。
# /root/kafka_2.12-2.4.0フォルダに暗号化設定ファイルを作成します。
cd kafka_2.12-2.4.0
vi client-auth.properties
security.protocol=SSL
ssl.truststore.location=/root/kafka.client.truststore.jks
ssl.truststore.password=[password]
hostsファイルの修正
hostファイルは、LinuxでDNSサーバより先にホスト名をIPに変換するファイルです。 暗号化通信を実行するには、hostsファイル(/etc/hosts)を修正する必要があります。
/etc/hosts
ファイルを実行します。
vi /etc/hosts
Brokerノード情報のhostsファイル情報のコピーを追加します。
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.24 yeakafka2-d-2am
192.168.2.25 yeakafka2-d-2an
192.168.2.26 yeakafka2-d-2ao
通信区間を暗号化してデータ転送
Producer VMで以下のようなコマンドを実行します。
転送するメッセージを入力すると、ブローカーノードにこのメッセージが保存されます。 終了するには、[Ctrl] + [C] キーを押します。
./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
# [broker.list]に先に確認したブローカーノード情報のTLSコピーを入力します。
# [topic]にCMAKで作成したTopicを入力します。
# 例) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093、yeakafka2-d-2an:9093、yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
通信区間を暗号化して保存されたデータの照会
Consumer VMで以下のようなコマンドを実行します。
cd kafka_2.12-2.4.0
./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
# [bootstrap.server]に先に確認したブローカーノード情報のTLSコピーを入力します。
# [topic]に前段階のProducer VMで入力したTopicを入力します。
# 例) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093、yeakafka2-d-2an:9093、yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
Kafka Connectでデータパイプラインを構築
Kafka Connectでデータパイプラインを構築する方法は、Kafka Connectでデータパイプライン構築をご参照ください。