Python

Prev Next

Available in VPC

This article describes how to consume data (messages) stored in a topic using a TCP access endpoint, and provides example code.

Caution

For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.

Preparations (Check authentication information and SASL settings)

To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:

  1. Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
    • Sub Account authentication information is used as your user name and password when setting up SASL authentication.
    • For more information about how to issue and check sub account Access Keys, see Create sub account.
  2. Add SASL settings to the data production/consumption code according to the Kafka client type (Python, Java, or Logstash).
    • Python, Java
      # username: Enter Sub Account Access Key.
      # password: Enter Sub Account Secret Key.
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

Data production example

The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library. The following is an example of data production.

Python producer

To produce data using Python example code:

  1. Install the Python Kafka client library with the following commands.

    pip install kafka-python
    
  2. After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.

    import ssl  
    
    from kafka import KafkaProducer  
    import json  
    
    brokers = "{TOPIC_access_endpoint}"  
    topic_name = "{TOPIC_NAME}"  
    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    producer = KafkaProducer(bootstrap_servers = brokers,  
                            sasl_plain_username = username,  
                            sasl_plain_password = password,  
                            security_protocol = security_protocol,  
                            ssl_context = context,  
                            sasl_mechanism = sasl_mechanism,  
                            api_version = (0,10),  
                            retries=5)  
    
    
    partition = 0
    key = "key-1"
    value_dict = {'key':  key, 'data': "this is a test (body)"}  
    msg_value_json = json.dumps(value_dict)
    
    producer.send(topic=topic_name,  
                key=bytes(key, 'utf-8'),  
                value=msg_value_json.encode("utf-8"),  
                partition=partition)  
    
    producer.flush()
    

Python producer with Schema Registry

To produce data using Python example code when you integrate with schema registry:

  1. When you integrate with schema registry, you may need the following libraries:
# Support for Kafka client and Schema Registry
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Support for Avro serialization
avro-python3==1.11.3
fastavro==1.9.4
  1. After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time

# NCP Connection Settings
brokers = "{TOPIC_access_endpoint}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# User Schema
user_schema_str = """{
  "type": "record",
  "name": "User",
  "namespace": "com.example.kafka",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": ["null", "int"], "default": null}
  ]
}"""

avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)

# Kafka Producer
producer_config = {
    'bootstrap.servers': brokers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
producer = Producer(producer_config)

# Sample data
users = [
    {"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
    {"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
    {"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]

print(f"Starting to send {len(users)} messages to {topic_name}")

try:
    for user in users:
        # Serialize and send message
        context = SerializationContext(topic_name, MessageField.VALUE)
        serialized_value = avro_serializer(user, context)
        producer.produce(
            topic=topic_name,
            key=f"user-{user['id']}",
            value=serialized_value
        )
        print(f"Sent: {user['name']} (ID: {user['id']})")
    
    producer.flush()
    print("All messages sent successfully!")
    
except Exception as e:
    print(f"Error: {e}")
finally:
    producer.flush()

Data consumption example

The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.

Python consumer

To consume data using Python example code:

  1. Install the Python Kafka client library with the following commands.
    pip install kafka-python
    
  2. After writing the Python code as in the following example, including the Sub Account authentication information confirmed in the preparation stage, run it.
    import ssl  
    from kafka import KafkaConsumer  
    
    brokers = "{TOPIC_access_endpoint}"  
    topic_name = "{TOPIC_NAME}"    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers,  
    						group_id = "my-consumer-group-name",  
    						auto_offset_reset='earliest',  
    						sasl_plain_username = username,  
    						sasl_plain_password = password,  
    						security_protocol = security_protocol,  
    						ssl_context = context,  
    						sasl_mechanism = sasl_mechanism)  
    
    from datetime import datetime  
    for message in consumer:  
    print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
    

Python consumer with Schema Registry

To produce data using Python example code when you integrate with schema registry:

  1. When you integrate with schema registry, you may need the following libraries:
# Support for Kafka client and Schema Registry
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Support for Avro serialization
avro-python3==1.11.3
fastavro==1.9.4
  1. After writing the Python code as in the following example, including the Sub Account and SASL authentication information confirmed in the preparation stage, run it.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# NCP Connection Settings
brokers = "{TOPIC_access_endpoint}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)

# Kafka Consumer
consumer_config = {
    'bootstrap.servers': brokers,
    'group.id': 'schema-registry-consumer',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])

print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")

try:
    while True:
        msg = consumer.poll(5.0)
        if msg is None:
            continue
            
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        
        try:
            # Avro deserialization (auto schema lookup)
            context = SerializationContext(topic_name, MessageField.VALUE)
            data = avro_deserializer(msg.value(), context)
            
            print(f"Received: {data}")
            
        except Exception as e:
            print(f"Deserialization error: {e}")

except KeyboardInterrupt:
    print("\nShutting down...")
finally:
    consumer.close()