Available in VPC
This article describes how to consume data (messages) stored in a topic using a TCP access endpoint, and provides example code.
For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.
Preparations (Check authentication information and SASL settings)
To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:
- Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
- Sub Account authentication information is used as your user name and password when setting up SASL authentication.
- For more information about how to issue and check sub account Access Keys, see Create sub account.
- Add SASL settings to the data production/consumption code according to the Kafka client type (Python, Java, or Logstash).
- Python, Java
# username: Enter Sub Account Access Key. # password: Enter Sub Account Secret Key. security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
Data production example
The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library. The following is an example of data production.
Python producer
To produce data using Python example code:
-
Install the Python Kafka client library with the following commands.
pip install kafka-python -
After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.
import ssl from kafka import KafkaProducer import json brokers = "{TOPIC_access_endpoint}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 producer = KafkaProducer(bootstrap_servers = brokers, sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism, api_version = (0,10), retries=5) partition = 0 key = "key-1" value_dict = {'key': key, 'data': "this is a test (body)"} msg_value_json = json.dumps(value_dict) producer.send(topic=topic_name, key=bytes(key, 'utf-8'), value=msg_value_json.encode("utf-8"), partition=partition) producer.flush()
Python producer with Schema Registry
To produce data using Python example code when you integrate with schema registry:
- When you integrate with schema registry, you may need the following libraries:
# Support for Kafka client and Schema Registry
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Support for Avro serialization
avro-python3==1.11.3
fastavro==1.9.4
- After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time
# NCP Connection Settings
brokers = "{TOPIC_access_endpoint}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# User Schema
user_schema_str = """{
"type": "record",
"name": "User",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}"""
avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)
# Kafka Producer
producer_config = {
'bootstrap.servers': brokers,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
producer = Producer(producer_config)
# Sample data
users = [
{"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]
print(f"Starting to send {len(users)} messages to {topic_name}")
try:
for user in users:
# Serialize and send message
context = SerializationContext(topic_name, MessageField.VALUE)
serialized_value = avro_serializer(user, context)
producer.produce(
topic=topic_name,
key=f"user-{user['id']}",
value=serialized_value
)
print(f"Sent: {user['name']} (ID: {user['id']})")
producer.flush()
print("All messages sent successfully!")
except Exception as e:
print(f"Error: {e}")
finally:
producer.flush()
Data consumption example
The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.
Python consumer
To consume data using Python example code:
- Install the Python Kafka client library with the following commands.
pip install kafka-python - After writing the Python code as in the following example, including the Sub Account authentication information confirmed in the preparation stage, run it.
import ssl from kafka import KafkaConsumer brokers = "{TOPIC_access_endpoint}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers, group_id = "my-consumer-group-name", auto_offset_reset='earliest', sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism) from datetime import datetime for message in consumer: print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
Python consumer with Schema Registry
To produce data using Python example code when you integrate with schema registry:
- When you integrate with schema registry, you may need the following libraries:
# Support for Kafka client and Schema Registry
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Support for Avro serialization
avro-python3==1.11.3
fastavro==1.9.4
- After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# NCP Connection Settings
brokers = "{TOPIC_access_endpoint}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)
# Kafka Consumer
consumer_config = {
'bootstrap.servers': brokers,
'group.id': 'schema-registry-consumer',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])
print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")
try:
while True:
msg = consumer.poll(5.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
try:
# Avro deserialization (auto schema lookup)
context = SerializationContext(topic_name, MessageField.VALUE)
data = avro_deserializer(msg.value(), context)
print(f"Received: {data}")
except Exception as e:
print(f"Deserialization error: {e}")
except KeyboardInterrupt:
print("\nShutting down...")
finally:
consumer.close()