Python

Prev Next

VPC 환경에서 이용 가능합니다.

TCP 접속 엔드포인트를 사용하여 토픽에 저장된 데이터(메시지)를 수신하는 방법과 예제 코드를 설명합니다.

주의

보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.

사전 준비(인증 정보 확인 및 SASL 설정)

Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.

  1. Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
    • Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
    • 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
  2. Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
    • Python, Java
      # username: Sub Account Access Key 입력
      # password: Sub Account Secret Key 입력
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

데이터 송신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제를 소개합니다.

Python producer

Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.

    pip install kafka-python
    
  2. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.

    import ssl  
    
    from kafka import KafkaProducer  
    import json  
    
    brokers = "{TOPIC_접속_엔드포인트}"  
    topic_name = "{TOPIC_NAME}"  
    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    producer = KafkaProducer(bootstrap_servers = brokers,  
                            sasl_plain_username = username,  
                            sasl_plain_password = password,  
                            security_protocol = security_protocol,  
                            ssl_context = context,  
                            sasl_mechanism = sasl_mechanism,  
                            api_version = (0,10),  
                            retries=5)  
    
    
    partition = 0
    key = "key-1"
    value_dict = {'key':  key, 'data': "this is a test (body)"}  
    msg_value_json = json.dumps(value_dict)
    
    producer.send(topic=topic_name,  
                key=bytes(key, 'utf-8'),  
                value=msg_value_json.encode("utf-8"),  
                partition=partition)  
    
    producer.flush()
    

Python producer with Schema Registry

스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
  1. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time

# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# User Schema
user_schema_str = """{
  "type": "record",
  "name": "User",
  "namespace": "com.example.kafka",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": ["null", "int"], "default": null}
  ]
}"""

avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)

# Kafka Producer
producer_config = {
    'bootstrap.servers': brokers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
producer = Producer(producer_config)

# Sample data
users = [
    {"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
    {"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
    {"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]

print(f"Starting to send {len(users)} messages to {topic_name}")

try:
    for user in users:
        # Serialize and send message
        context = SerializationContext(topic_name, MessageField.VALUE)
        serialized_value = avro_serializer(user, context)
        producer.produce(
            topic=topic_name,
            key=f"user-{user['id']}",
            value=serialized_value
        )
        print(f"Sent: {user['name']} (ID: {user['id']})")
    
    producer.flush()
    print("All messages sent successfully!")
    
except Exception as e:
    print(f"Error: {e}")
finally:
    producer.flush()

데이터 수신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.

Python consumer

Python 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

  1. 다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.
    pip install kafka-python
    
  2. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
    import ssl  
    from kafka import KafkaConsumer  
    
    brokers = "{TOPIC_접속_엔드포인트}"  
    topic_name = "{TOPIC_NAME}"    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers,  
    						group_id = "my-consumer-group-name",  
    						auto_offset_reset='earliest',  
    						sasl_plain_username = username,  
    						sasl_plain_password = password,  
    						security_protocol = security_protocol,  
    						ssl_context = context,  
    						sasl_mechanism = sasl_mechanism)  
    
    from datetime import datetime  
    for message in consumer:  
    print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
    

Python consumer with Schema Registry

스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
  1. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)

# Kafka Consumer
consumer_config = {
    'bootstrap.servers': brokers,
    'group.id': 'schema-registry-consumer',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])

print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")

try:
    while True:
        msg = consumer.poll(5.0)
        if msg is None:
            continue
            
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        
        try:
            # Avro deserialization (auto schema lookup)
            context = SerializationContext(topic_name, MessageField.VALUE)
            data = avro_deserializer(msg.value(), context)
            
            print(f"Received: {data}")
            
        except Exception as e:
            print(f"Deserialization error: {e}")

except KeyboardInterrupt:
    print("\nShutting down...")
finally:
    consumer.close()