VPC環境で利用できます。
TCP接続エンドポイントを使用してトピックに保存されたデータ(メッセージ)を受信する方法およびサンプルコードについて説明します。
セキュリティのため、データ送信時にメインアカウントの認証情報を使用することはできませんので、必ずサブアカウントの認証情報を使用してください。
事前準備(認証情報の確認と SASL設定)
Kafkaクライアントアプリケーションを使用して Data Streamサービスにアクセスするには、Kafkaクライアントのタイプに合わせて SASL設定を追加する必要があります。Sub Account認証情報の確認と SASLの設定方法は、次の通りです。
- Data Streamサービスに接続するユーザーの Sub Account認証情報(Access keyと Secret key)を確認します。
- Sub Account認証情報は、SASL認証設定時にユーザー名(username)とパスワード(password)として使用されます。
- サブアカウントの Access Keyの発行と確認: サブアカウント作成を参照
- Kafkaクライアントのタイプ(Python、Java、Logstash)に合わせてデータ送受信コードに SASL設定を追加します。
- Python, Java
# username: Sub Account Access Keyを入力 # password: Sub Account Secret Keyを入力 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
データ送信のユースケース
Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを送信できます。データ送信のユースケースを紹介します。
Python producer
Pythonのサンプルコードを使用してデータを送信する方法は、次の通りです。
-
次のコマンドで Python Kafkaクライアントライブラリをインストールします。
pip install kafka-python -
次の例のように、事前準備で確認した Sub Accountと SASL認証情報を含め、Pythonコードを作成して実行します。
import ssl from kafka import KafkaProducer import json brokers = "{TOPIC_接続_エンドポイント}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 producer = KafkaProducer(bootstrap_servers = brokers, sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism, api_version = (0,10), retries=5) partition = 0 key = "key-1" value_dict = {'key': key, 'data': "this is a test (body)"} msg_value_json = json.dumps(value_dict) producer.send(topic=topic_name, key=bytes(key, 'utf-8'), value=msg_value_json.encode("utf-8"), partition=partition) producer.flush()
Python producer with Schema Registry
スキーマレジストリを連携する場合、Pythonサンプルコードを使用してデータを送信する方法は次の通りです。
- スキーマレジストリを連携する時に、以下のようなライブラリが必要になることがあります。
# Kafkaクライアントと Schema Registryをサポート
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Avro直列化をサポート
avro-python3==1.11.3
fastavro==1.9.4
- 次の例のように、事前準備で確認した Sub Accountと SASL認証情報を含め、Pythonコードを作成して実行します。
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time
# NCP Connection Settings
brokers = "{TOPIC_接続_エンドポイント}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# User Schema
user_schema_str = """{
"type": "record",
"name": "User",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}"""
avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)
# Kafka Producer
producer_config = {
'bootstrap.servers': brokers,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
producer = Producer(producer_config)
# Sample data
users = [
{"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]
print(f"Starting to send {len(users)} messages to {topic_name}")
try:
for user in users:
# Serialize and send message
context = SerializationContext(topic_name, MessageField.VALUE)
serialized_value = avro_serializer(user, context)
producer.produce(
topic=topic_name,
key=f"user-{user['id']}",
value=serialized_value
)
print(f"Sent: {user['name']} (ID: {user['id']})")
producer.flush()
print("All messages sent successfully!")
except Exception as e:
print(f"Error: {e}")
finally:
producer.flush()
データ受信のユースケース
Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを受信できます。データ受信のユースケースを紹介します。
Python consumer
Pythonのサンプルコードを使用してデータを受信する方法は、次の通りです。
- 次のコマンドで Python Kafkaクライアントライブラリをインストールします。
pip install kafka-python - 次の例のように、事前準備で確認した Sub Account認証情報を含め、Pythonコードを作成して実行します。
import ssl from kafka import KafkaConsumer brokers = "{TOPIC_接続_エンドポイント}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers, group_id = "my-consumer-group-name", auto_offset_reset='earliest', sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism) from datetime import datetime for message in consumer: print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
Python consumer with Schema Registry
スキーマレジストリを連携する場合、Pythonサンプルコードを使用してデータを送信する方法は次の通りです。
- スキーマレジストリを連携する時に、以下のようなライブラリが必要になることがあります。
# Kafkaクライアントと Schema Registryをサポート
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Avro直列化をサポート
avro-python3==1.11.3
fastavro==1.9.4
- 次の例のように、事前準備で確認した Sub Accountと SASL認証情報を含め、Pythonコードを作成して実行します。
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# NCP Connection Settings
brokers = "{TOPIC_接続_エンドポイント}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)
# Kafka Consumer
consumer_config = {
'bootstrap.servers': brokers,
'group.id': 'schema-registry-consumer',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])
print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")
try:
while True:
msg = consumer.poll(5.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
try:
# Avro deserialization (auto schema lookup)
context = SerializationContext(topic_name, MessageField.VALUE)
data = avro_deserializer(msg.value(), context)
print(f"Received: {data}")
except Exception as e:
print(f"Deserialization error: {e}")
except KeyboardInterrupt:
print("\nShutting down...")
finally:
consumer.close()