Logstash

Prev Next

Available in VPC

This article describes how to integrate messages (data) with a topic using a TCP access endpoint, and provides example code.

Caution

For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.

Preparations (Check authentication information and SASL settings)

To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:

  1. Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
    • Sub Account authentication information is used as your user name and password when setting up SASL authentication.
    • Issuance and verification of the sub account Access Key: See Create Sub Account
  2. For Logstash, you can integrate with Data Stream through the Kafka plugin supported by Logstash. Configure the sasl.jaas.config file as shown in the following example:
# username: Enter Sub Account Access Key.
# password: Enter Sub Account Secret Key.

security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

Data production examples

The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library.

Logstash producer

To produce data using Logstash example code:

input {
	...
}

output {
  kafka {
    bootstrap_servers => "{TOPIC_access_endpoint}"
    topic_id => "{TOPIC_NAME}"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

Data consumption examples

The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library.

Logstash consumer

To consume data using Logstash example code:

input {
  kafka {
    codec => plain

    bootstrap_servers => "{TOPIC_access_endpoint}"
    topics => ["{TOPIC_NAME}"]
    group_id => "logstash-kafka-consumer"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

output {
	...
}

Logstash consumer with Schema Registry

To consume data using Logstash example code when you integrate with schema registry:

  • schema_registry_url: Enter the schema registry to access information
  • schema_registry_key: Enter Sub Account Access Key
  • schema_registry_secret: Enter Sub Account Secret Key
input {
  kafka {
    bootstrap_servers => "{TOPIC_access_endpoint}"
    topics => ["{TOPIC_NAME}"]
    group_id => "simple-schema-consumer"
    auto_offset_reset => "earliest"

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

    schema_registry_url => "{SCHEMA_REGISTRY_URL}"
    schema_registry_key => "{ncp_iam_sub_access_key}"
    schema_registry_secret => "{ncp_iam_sub_secret_key}"

    key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
    value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  }
}

filter {
  mutate {
    add_field => { "saved_at" => "%{@timestamp}" }
  }
}

output {
  file {
    path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
    codec => json_lines
  }
}