VPC 환경에서 이용 가능합니다.
TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽과 연동하는 방법과 예제 코드를 설명합니다.
주의
보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.
사전 준비(인증 정보 확인 및 SASL 설정)
Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.
- Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
- Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
- 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
- Logstash의 경우, Logstash에서 지원하는 kafka plugin을 통해 Data Stream과 연동할 수 있습니다.
sasl.jaas.config파일에 다음 예시와 같이 설정해 주십시오.
# username: Sub Account Access Key 입력
# password: Sub Account Secret Key 입력
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
데이터 송신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다.
Logstash producer
Logstash 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
input {
...
}
output {
kafka {
bootstrap_servers => "{TOPIC_접속_엔드포인트}"
topic_id => "{TOPIC_NAME}"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
데이터 수신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다.
Logstash consumer
Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
input {
kafka {
codec => plain
bootstrap_servers => "{TOPIC_접속_엔드포인트}"
topics => ["{TOPIC_NAME}"]
group_id => "logstash-kafka-consumer"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
output {
...
}
Logstash consumer with Schema Registry
스키마 레지스트리를 연동하는 경우, Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
schema_registry_url: 스키마 레지스트리 접속 정보 입력schema_registry_key: Sub Account Access Key 입력schema_registry_secret: Sub Account Secret Key 입력
input {
kafka {
bootstrap_servers => "{TOPIC_접속_엔드포인트}"
topics => ["{TOPIC_NAME}"]
group_id => "simple-schema-consumer"
auto_offset_reset => "earliest"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
schema_registry_url => "{SCHEMA_REGISTRY_URL}"
schema_registry_key => "{ncp_iam_sub_access_key}"
schema_registry_secret => "{ncp_iam_sub_secret_key}"
key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
}
}
filter {
mutate {
add_field => { "saved_at" => "%{@timestamp}" }
}
}
output {
file {
path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
codec => json_lines
}
}