Logstash

Prev Next

VPC環境で利用できます。

TCP接続エンドポイントを使用してメッセージ(データ)をトピックに連携する方法とサンプルコードについて説明します。

注意

セキュリティのため、データ送信時にメインアカウントの認証情報を使用することはできませんので、必ずサブアカウントの認証情報を使用してください。

事前準備(認証情報の確認と SASL設定)

Kafkaクライアントアプリケーションを使用して Data Streamサービスにアクセスするには、Kafkaクライアントのタイプに合わせて SASL設定を追加する必要があります。Sub Account認証情報の確認と SASLの設定方法は、次の通りです。

  1. Data Streamサービスに接続するユーザーの Sub Account認証情報(Access keyと Secret key)を確認します。
    • Sub Account認証情報は、SASL認証設定時にユーザー名(username)とパスワード(password)として使用されます。
    • サブアカウントの Access Keyの発行と確認: サブアカウント作成を参照
  2. Kafkaクライアントのタイプ(Python、Java、Logstash)に合わせてデータ送受信コードに SASL設定を追加します。
    • Logstashの場合、Logstashがサポートする kafka pluginを通じて Data Streamと連携できます。
    • Logstash
      sasl.jaas.configファイルに以下の例のように設定します。
      # username: Sub Account Access Keyを入力
      # password: Sub Account Secret Keyを入力
      
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
      

データ送信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを送信できます。データ受信のユースケースを紹介します。

Logstash producer

Logstashサンプルコードを使用してデータを送信する方法は次の通りです。

input {
	...
}

output {

  kafka { # streamトピックにデータを保存(書き込み)したい場合、outputに設定

    bootstrap_servers => "{TOPIC_接続_エンドポイント}"  -- TOPIC接続エンドポイント
    topic_id => "{TOPIC_NAME}" -- データを送信する目的地のトピック名

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

データ受信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを受信できます。データ受信のユースケースを紹介します。

Logstash consumer

Logstashサンプルコードを使用してデータを受信する方法は次の通りです。

input {
  kafka { # streamトピックデータを読み取り(consume)したい場合、inputに設定
    codec => plain

    bootstrap_servers => "{TOPIC_接続_エンドポイント}" -- トピック接続エンドポイント
    topics => ["{TOPIC_NAME}"]  -- データを取得するトピック名
    group_id => "logstash-kafka-consumer" -- コンシューマグループ id (consumer.group.id)

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

output {
	...
}

Logstash consumer with Schema Registry

スキーマレジストリを連携する場合、Logstashサンプルコードを使用してデータを受信する方法は次の通りです。

  • schema_registry_url: スキーマレジストリアクセス情報を入力
  • schema_registry_key: Sub Account Access Keyを入力
  • schema_registry_secret: Sub Account Secret Keyを入力
input {
  kafka {
    bootstrap_servers => "{TOPIC_接続_エンドポイント}"
    topics => ["{TOPIC_NAME}}"]
    group_id => "simple-schema-consumer"
    auto_offset_reset => "earliest"
    
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

    # Schema Registry設定(NCP IAM認証情報を使用)
    schema_registry_url => "{SCHEMA_REGISTRY_URL}"
    schema_registry_key => "{ncp_iam_sub_access_key}"
    schema_registry_secret => "{ncp_iam_sub_secret_key}"
    
    # 自動 Avro逆直列化(codec設定不必要)
    key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
  }
}

filter {
  mutate {
    add_field => { "saved_at" => "%{@timestamp}" }
  }
}

output {
  file {
    path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
    codec => json_lines
  }

#  stdout {
#    codec => rubydebug
#  }