Logstash

Prev Next

VPC 환경에서 이용 가능합니다.

TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽에 연동하는 방법과 예제 코드를 설명합니다.

주의

보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.

사전 준비(인증 정보 확인 및 SASL 설정)

Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.

  1. Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
    • Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
    • 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
  2. Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
    • Logstash의 경우, Logstash에서 지원하는 kafka plugin을 통해 Data Stream과 연동할 수 있습니다.
    • Logstash
      sasl.jaas.config 파일에 다음 예시와 같이 설정해 주십시오.
      # username: Sub Account Access Key 입력
      # password: Sub Account Secret Key 입력
      
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
      

데이터 송신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 수신 예제를 소개합니다.

Logstash producer

Logstash 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

input {
	...
}

output {

  kafka { # stream 토픽에 데이터를 저장(쓰기)하고 싶을 때, output 설정

    bootstrap_servers => "{TOPIC_접속_엔드포인트}"  -- TOPIC 접속 엔드포인트
    topic_id => "{TOPIC_NAME}" -- 데이터를 전송할 목적지 토픽 이름

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

데이터 수신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.

Logstash consumer

Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

input {
  kafka { # stream 토픽 데이터를 읽고(consume) 싶을 때, input 설정
    codec => plain

    bootstrap_servers => "{TOPIC_접속_엔드포인트}" -- 토픽 접속 엔드포인트
    topics => ["{TOPIC_NAME}"]  -- 데이터를 가져올 토픽 이름
    group_id => "logstash-kafka-consumer" -- 컨슈머 그룹 id (consumer.group.id)

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

output {
	...
}

Logstash consumer with Schema Registry

스키마 레지스트리를 연동하는 경우, Logstash 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

  • schema_registry_url : 스키마 레지스트리 접속정보 입력
  • schema_registry_key : Sub Account Access Key 입력
  • schema_registry_secret : Sub Account Secret Key 입력
input {
  kafka {
    bootstrap_servers => "{TOPIC_접속_엔드포인트}"
    topics => ["{TOPIC_NAME}}"]
    group_id => "simple-schema-consumer"
    auto_offset_reset => "earliest"
    
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

    # Schema Registry 설정 (NCP IAM 자격 증명 사용)
    schema_registry_url => "{SCHEMA_REGISTRY_URL}"
    schema_registry_key => "{ncp_iam_sub_access_key}"
    schema_registry_secret => "{ncp_iam_sub_secret_key}"
    
    # 자동 Avro 역직렬화 (codec 설정 불필요)
    key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
  }
}

filter {
  mutate {
    add_field => { "saved_at" => "%{@timestamp}" }
  }
}

output {
  file {
    path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
    codec => json_lines
  }

#  stdout {
#    codec => rubydebug
#  }