Logstash

Prev Next

Available in VPC

This article describes how to integrate messages (data) with a topic using a TCP access endpoint, and provides example code.

Caution

For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.

Preparations (Check authentication information and SASL settings)

To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:

  1. Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
    • Sub Account authentication information is used as your user name and password when setting up SASL authentication.
    • For more information about how to issue and check sub account Access Keys, see Create sub account.
  2. Add SASL settings to the data production/consumption code according to the Kafka client type (Python, Java, or Logstash).
    • For Logstash, you can integrate with Data Stream through the Kafka plugin supported by Logstash.
    • Logstash
      Configure the sasl.jaas.config file as shown in the following example:
      # username: Enter Sub Account Access Key.
      # password: Enter Sub Account Secret Key.
      
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
      

Data production example

The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.

Logstash producer

To produce data using Logstash example code:

input {
	...
}

output {

  kafka { # when you want to save (produce) data to the stream topic, set output

    bootstrap_servers => "{TOPIC_access_endpoint}"  -- TOPIC access endpoint
    topic_id => "{TOPIC_NAME}" -- name of destination topic to which data will be transferred

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

Data consumption example

The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.

Logstash consumer

To consume data using Logstash example code:

input {
  kafka { # when you want to read (consume) stream topic data, set input
    codec => plain

    bootstrap_servers => "{TOPIC_access_endpoint}" -- topic access endpoint
    topics => ["{TOPIC_NAME}"]  -- topic name from which to retrieve data
    group_id => "logstash-kafka-consumer" -- consumer group id (consumer.group.id)

    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
  }
}

output {
	...
}

Logstash consumer with Schema Registry

To consume data using Logstash example code when you integrate with schema registry:

  • schema_registry_url: Enter the schema registry access information.
  • schema_registry_key: Enter Sub Account Access Key.
  • schema_registry_secret: Enter Sub Account Secret Key.
input {
  kafka {
    bootstrap_servers => "{TOPIC_access_endpoint}"
    topics => ["{TOPIC_NAME}}"]
    group_id => "simple-schema-consumer"
    auto_offset_reset => "earliest"
    
    security_protocol => "SASL_SSL"
    sasl_mechanism => "PLAIN"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"

    # Schema Registry settings (using NCP IAM credentials)
    schema_registry_url => "{SCHEMA_REGISTRY_URL}"
    schema_registry_key => "{ncp_iam_sub_access_key}"
    schema_registry_secret => "{ncp_iam_sub_secret_key}"
    
    # Automatic Avro deserialization (no codec settings required)
    key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
  }
}

filter {
  mutate {
    add_field => { "saved_at" => "%{@timestamp}" }
  }
}

output {
  file {
    path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
    codec => json_lines
  }

#  stdout {
#    codec => rubydebug
#  }