Available in VPC
This article describes how to integrate messages (data) with a topic using a TCP access endpoint, and provides example code.
For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.
Preparations (Check authentication information and SASL settings)
To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:
- Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
- Sub Account authentication information is used as your user name and password when setting up SASL authentication.
- For more information about how to issue and check sub account Access Keys, see Create sub account.
- Add SASL settings to the data production/consumption code according to the Kafka client type (Python, Java, or Logstash).
- For Logstash, you can integrate with Data Stream through the Kafka plugin supported by Logstash.
- Logstash
Configure thesasl.jaas.configfile as shown in the following example:# username: Enter Sub Account Access Key. # password: Enter Sub Account Secret Key. security_protocol => "SASL_SSL" sasl_mechanism => "PLAIN" sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
Data production example
The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.
Logstash producer
To produce data using Logstash example code:
input {
...
}
output {
kafka { # when you want to save (produce) data to the stream topic, set output
bootstrap_servers => "{TOPIC_access_endpoint}" -- TOPIC access endpoint
topic_id => "{TOPIC_NAME}" -- name of destination topic to which data will be transferred
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
Data consumption example
The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.
Logstash consumer
To consume data using Logstash example code:
input {
kafka { # when you want to read (consume) stream topic data, set input
codec => plain
bootstrap_servers => "{TOPIC_access_endpoint}" -- topic access endpoint
topics => ["{TOPIC_NAME}"] -- topic name from which to retrieve data
group_id => "logstash-kafka-consumer" -- consumer group id (consumer.group.id)
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
output {
...
}
Logstash consumer with Schema Registry
To consume data using Logstash example code when you integrate with schema registry:
- schema_registry_url: Enter the schema registry access information.
- schema_registry_key: Enter Sub Account Access Key.
- schema_registry_secret: Enter Sub Account Secret Key.
input {
kafka {
bootstrap_servers => "{TOPIC_access_endpoint}"
topics => ["{TOPIC_NAME}}"]
group_id => "simple-schema-consumer"
auto_offset_reset => "earliest"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
# Schema Registry settings (using NCP IAM credentials)
schema_registry_url => "{SCHEMA_REGISTRY_URL}"
schema_registry_key => "{ncp_iam_sub_access_key}"
schema_registry_secret => "{ncp_iam_sub_secret_key}"
# Automatic Avro deserialization (no codec settings required)
key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
}
}
filter {
mutate {
add_field => { "saved_at" => "%{@timestamp}" }
}
}
output {
file {
path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
codec => json_lines
}
# stdout {
# codec => rubydebug
# }