Logstash

Prev Next

VPC環境で利用できます。

TCP接続エンドポイントを使用してメッセージ(データ)をトピックに連携する方法とサンプルコードについて説明します。

注意

セキュリティのため、データ送信時にメインアカウントの認証情報を使用することはできませんので、必ずサブアカウントの認証情報を使用してください。

事前準備(認証情報の確認と SASL設定)

Kafkaクライアントアプリケーションを使用して Data Streamサービスにアクセスするには、Kafkaクライアントのタイプに合わせて SASL設定を追加する必要があります。Sub Account認証情報の確認と SASLの設定方法は、次の通りです。

  1. Data Streamサービスに接続するユーザーの Sub Account認証情報(Access keyと Secret key)を確認します。
    • Sub Account認証情報は、SASL認証設定時にユーザー名(username)とパスワード(password)として使用されます。
    • サブアカウント Access Keyの発行と確認: サブアカウント作成を参照
  2. Logstashの場合、Logstashがサポートする kafka pluginを通じて Data Streamと連携できます。sasl.jaas.configファイルに以下の例のように設定します。
# username: Sub Account Access Keyを入力
# password: Sub Account Secret Keyを入力

security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

データ送信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを送信できます。

Logstash producer

Logstashサンプルコードを使用してデータを送信する方法は次の通りです。

input {
	...
}

output {
  kafka {
    bootstrap_servers => "{TOPIC_接続_エンドポイント}"
    topic_id => "{TOPIC_NAME}"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

データ受信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを受信できます。

Logstash consumer

Logstashサンプルコードを使用してデータを受信する方法は次の通りです。

input {
  kafka {
    codec => plain

    bootstrap_servers => "{TOPIC_接続_エンドポイント}"
    topics => ["{TOPIC_NAME}"]
    group_id => "logstash-kafka-consumer"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

output {
	...
}

Logstash consumer with Schema Registry

スキーマレジストリを連携する場合、Logstashサンプルコードを使用してデータを受信する方法は次の通りです。

  • schema_registry_url: スキーマレジストリのアクセス情報を入力
  • schema_registry_key: Sub Account Access Keyを入力
  • schema_registry_secret: Sub Account Secret Keyを入力
input {
  kafka {
    bootstrap_servers => "{TOPIC_接続_エンドポイント}"
    topics => ["{TOPIC_NAME}"]
    group_id => "simple-schema-consumer"
    auto_offset_reset => "earliest"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

    schema_registry_url => "{SCHEMA_REGISTRY_URL}"
    schema_registry_key => "{ncp_iam_sub_access_key}"
    schema_registry_secret => "{ncp_iam_sub_secret_key}"

    key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
    value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  }
}

filter {
  mutate {
    add_field => { "saved_at" => "%{@timestamp}" }
  }
}

output {
  file {
    path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
    codec => json_lines
  }
}