Documentation Index

Fetch the complete documentation index at: https://guide.ncloud-docs.com/llms.txt

Use this file to discover all available pages before exploring further.

Python

Prev Next

VPC 환경에서 이용 가능합니다.

TCP 접속 엔드포인트를 사용하여 토픽에 저장된 데이터(메시지)를 수신하는 방법과 예제 코드를 설명합니다.

주의

보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.

사전 준비(인증 정보 확인 및 SASL 설정)

Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.

  1. Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
    • Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
    • 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
  2. Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
    • Python, Java
      # username: Sub Account Access Key 입력
      # password: Sub Account Secret Key 입력
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

데이터 송신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제를 소개합니다.

Python producer

Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.

    pip install kafka-python
    
  2. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.

    import ssl  
    
    from kafka import KafkaProducer  
    import json  
    
    brokers = "{TOPIC_접속_엔드포인트}"  
    topic_name = "{TOPIC_NAME}"  
    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    producer = KafkaProducer(bootstrap_servers = brokers,  
                            sasl_plain_username = username,  
                            sasl_plain_password = password,  
                            security_protocol = security_protocol,  
                            ssl_context = context,  
                            sasl_mechanism = sasl_mechanism,  
                            api_version = (0,10),  
                            retries=5)  
    
    
    partition = 0
    key = "key-1"
    value_dict = {'key':  key, 'data': "this is a test (body)"}  
    msg_value_json = json.dumps(value_dict)
    
    producer.send(topic=topic_name,  
                key=bytes(key, 'utf-8'),  
                value=msg_value_json.encode("utf-8"),  
                partition=partition)  
    
    producer.flush()
    

Python producer with Schema Registry

스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
  1. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time

# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# User Schema
user_schema_str = """{
  "type": "record",
  "name": "User",
  "namespace": "com.example.kafka",
  "fields": [
    {"name": "id", "type": "int"},
    {"name": "name", "type": "string"},
    {"name": "email", "type": "string"},
    {"name": "age", "type": ["null", "int"], "default": null}
  ]
}"""

avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)

# Kafka Producer
producer_config = {
    'bootstrap.servers': brokers,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
producer = Producer(producer_config)

# Sample data
users = [
    {"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
    {"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
    {"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]

print(f"Starting to send {len(users)} messages to {topic_name}")

try:
    for user in users:
        # Serialize and send message
        context = SerializationContext(topic_name, MessageField.VALUE)
        serialized_value = avro_serializer(user, context)
        producer.produce(
            topic=topic_name,
            key=f"user-{user['id']}",
            value=serialized_value
        )
        print(f"Sent: {user['name']} (ID: {user['id']})")
    
    producer.flush()
    print("All messages sent successfully!")
    
except Exception as e:
    print(f"Error: {e}")
finally:
    producer.flush()

데이터 수신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.

Python consumer

Python 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

  1. 다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.
    pip install kafka-python
    
  2. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
    import ssl  
    from kafka import KafkaConsumer  
    
    brokers = "{TOPIC_접속_엔드포인트}"  
    topic_name = "{TOPIC_NAME}"    
    username = "{ncp_iam_SubAccountAccessKey}"  
    password = "{ncp_iam_SubAccountSecretKey}"
    
    security_protocol = 'SASL_SSL'  
    sasl_mechanism = 'PLAIN'  
    
    
    # Create a new context using system defaults, disable all but TLS1.2  
    context = ssl.create_default_context()  
    context.options &= ssl.OP_NO_TLSv1  
    context.options &= ssl.OP_NO_TLSv1_1  
    
    consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers,  
    						group_id = "my-consumer-group-name",  
    						auto_offset_reset='earliest',  
    						sasl_plain_username = username,  
    						sasl_plain_password = password,  
    						security_protocol = security_protocol,  
    						ssl_context = context,  
    						sasl_mechanism = sasl_mechanism)  
    
    from datetime import datetime  
    for message in consumer:  
    print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
    

Python consumer with Schema Registry

스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  1. 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0

# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
  1. 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"

# Schema Registry Client
sr_config = {
    'url': schema_registry_url,
    'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)

# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)

# Kafka Consumer
consumer_config = {
    'bootstrap.servers': brokers,
    'group.id': 'schema-registry-consumer',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': username,
    'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])

print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")

try:
    while True:
        msg = consumer.poll(5.0)
        if msg is None:
            continue
            
        if msg.error():
            print(f"Error: {msg.error()}")
            continue
        
        try:
            # Avro deserialization (auto schema lookup)
            context = SerializationContext(topic_name, MessageField.VALUE)
            data = avro_deserializer(msg.value(), context)
            
            print(f"Received: {data}")
            
        except Exception as e:
            print(f"Deserialization error: {e}")

except KeyboardInterrupt:
    print("\nShutting down...")
finally:
    consumer.close()