VPC 환경에서 이용 가능합니다.
TCP 접속 엔드포인트를 사용하여 토픽에 저장된 데이터(메시지)를 수신하는 방법과 예제 코드를 설명합니다.
보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.
사전 준비(인증 정보 확인 및 SASL 설정)
Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.
- Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
- Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
- 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
- Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
- Python, Java
# username: Sub Account Access Key 입력 # password: Sub Account Secret Key 입력 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
데이터 송신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제를 소개합니다.
Python producer
Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
-
다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.
pip install kafka-python -
다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
import ssl from kafka import KafkaProducer import json brokers = "{TOPIC_접속_엔드포인트}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 producer = KafkaProducer(bootstrap_servers = brokers, sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism, api_version = (0,10), retries=5) partition = 0 key = "key-1" value_dict = {'key': key, 'data': "this is a test (body)"} msg_value_json = json.dumps(value_dict) producer.send(topic=topic_name, key=bytes(key, 'utf-8'), value=msg_value_json.encode("utf-8"), partition=partition) producer.flush()
Python producer with Schema Registry
스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
- 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
- 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
import time
# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# User Schema
user_schema_str = """{
"type": "record",
"name": "User",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}"""
avro_serializer = AvroSerializer(schema_registry_client, user_schema_str)
# Kafka Producer
producer_config = {
'bootstrap.servers': brokers,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
producer = Producer(producer_config)
# Sample data
users = [
{"id": 1, "name": "John Doe", "email": "john@example.com", "age": 30},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"id": 3, "name": "Bob Johnson", "email": "bob@example.com", "age": None}
]
print(f"Starting to send {len(users)} messages to {topic_name}")
try:
for user in users:
# Serialize and send message
context = SerializationContext(topic_name, MessageField.VALUE)
serialized_value = avro_serializer(user, context)
producer.produce(
topic=topic_name,
key=f"user-{user['id']}",
value=serialized_value
)
print(f"Sent: {user['name']} (ID: {user['id']})")
producer.flush()
print("All messages sent successfully!")
except Exception as e:
print(f"Error: {e}")
finally:
producer.flush()
데이터 수신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.
Python consumer
Python 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
- 다음 명령어로 Python Kafka 클라이언트 라이브러리를 설치해 주십시오.
pip install kafka-python - 다음 예시와 같이 사전 준비에서 확인한 Sub Account 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
import ssl from kafka import KafkaConsumer brokers = "{TOPIC_접속_엔드포인트}" topic_name = "{TOPIC_NAME}" username = "{ncp_iam_SubAccountAccessKey}" password = "{ncp_iam_SubAccountSecretKey}" security_protocol = 'SASL_SSL' sasl_mechanism = 'PLAIN' # Create a new context using system defaults, disable all but TLS1.2 context = ssl.create_default_context() context.options &= ssl.OP_NO_TLSv1 context.options &= ssl.OP_NO_TLSv1_1 consumer = KafkaConsumer(topic_name, bootstrap_servers = brokers, group_id = "my-consumer-group-name", auto_offset_reset='earliest', sasl_plain_username = username, sasl_plain_password = password, security_protocol = security_protocol, ssl_context = context, sasl_mechanism = sasl_mechanism) from datetime import datetime for message in consumer: print(f'({datetime.now()}) Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
Python consumer with Schema Registry
스키마 레지스트리를 연동하는 경우, Python 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
- 스키마 레지스트리를 연동할 때, 다음과 같은 라이브러리가 필요할 수 있습니다.
# Kafka 클라이언트 및 Schema Registry 지원
confluent-kafka[avro]==2.3.0
confluent-kafka[schema-registry]==2.3.0
# Avro 직렬화 지원
avro-python3==1.11.3
fastavro==1.9.4
- 다음 예시와 같이 사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 포함하여 Python 코드를 작성한 후 실행해 주십시오.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
# NCP Connection Settings
brokers = "{TOPIC_접속_엔드포인트}"
topic_name = "{TOPIC_NAME}"
username = "{ncp_iam_SubAccountAccessKey}"
password = "{ncp_iam_SubAccountSecretKey}"
schema_registry_url = "{SCHEMA_REGISTRY_URL}"
# Schema Registry Client
sr_config = {
'url': schema_registry_url,
'basic.auth.user.info': f'{username}:{password}'
}
schema_registry_client = SchemaRegistryClient(sr_config)
# Avro Deserializer
avro_deserializer = AvroDeserializer(schema_registry_client)
# Kafka Consumer
consumer_config = {
'bootstrap.servers': brokers,
'group.id': 'schema-registry-consumer',
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': username,
'sasl.password': password,
}
consumer = Consumer(consumer_config)
consumer.subscribe([topic_name])
print(f"Consumer started: {topic_name}")
print("Auto schema lookup from Schema Registry")
try:
while True:
msg = consumer.poll(5.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
try:
# Avro deserialization (auto schema lookup)
context = SerializationContext(topic_name, MessageField.VALUE)
data = avro_deserializer(msg.value(), context)
print(f"Received: {data}")
except Exception as e:
print(f"Deserialization error: {e}")
except KeyboardInterrupt:
print("\nShutting down...")
finally:
consumer.close()