VPC 환경에서 이용 가능합니다.
TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽에 연동하는 방법과 예제 코드를 설명합니다.
주의
보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.
사전 준비(인증 정보 확인 및 SASL 설정)
Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.
- Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
- Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
- 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
- Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
- Logstash의 경우, Logstash에서 지원하는 kafka plugin을 통해 Data Stream과 연동할 수 있습니다.
- Logstash
sasl.jaas.config파일에 다음 예시와 같이 설정해 주십시오.# username: Sub Account Access Key 입력 # password: Sub Account Secret Key 입력 security_protocol => "SASL_SSL" sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
데이터 송신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 수신 예제를 소개합니다.
Logstash producer
Logstash 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
input {
...
}
output {
kafka { # stream 토픽에 데이터를 저장(쓰기)하고 싶을 때, output 설정
bootstrap_servers => "{TOPIC_접속_엔드포인트}" -- TOPIC 접속 엔드포인트
topic_id => "{TOPIC_NAME}" -- 데이터를 전송할 목적지 토픽 이름
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
데이터 수신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.
Logstash consumer
Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
input {
kafka { # stream 토픽 데이터를 읽고(consume) 싶을 때, input 설정
codec => plain
bootstrap_servers => "{TOPIC_접속_엔드포인트}" -- 토픽 접속 엔드포인트
topics => ["{TOPIC_NAME}"] -- 데이터를 가져올 토픽 이름
group_id => "logstash-kafka-consumer" -- 컨슈머 그룹 id (consumer.group.id)
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
output {
...
}
Logstash consumer with Schema Registry
스키마 레지스트리를 연동하는 경우, Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
- schema_registry_url : 스키마 레지스트리 접속정보 입력
- schema_registry_key : Sub Account Access Key 입력
- schema_registry_secret : Sub Account Secret Key 입력
input {
kafka {
bootstrap_servers => "{TOPIC_접속_엔드포인트}"
topics => ["{TOPIC_NAME}}"]
group_id => "simple-schema-consumer"
auto_offset_reset => "earliest"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
# Schema Registry 설정 (NCP IAM 자격 증명 사용)
schema_registry_url => "{SCHEMA_REGISTRY_URL}"
schema_registry_key => "{ncp_iam_sub_access_key}"
schema_registry_secret => "{ncp_iam_sub_secret_key}"
# 자동 Avro 역직렬화 (codec 설정 불필요)
key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
}
}
filter {
mutate {
add_field => { "saved_at" => "%{@timestamp}" }
}
}
output {
file {
path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
codec => json_lines
}
# stdout {
# codec => rubydebug
# }