Available in VPC
This article describes how to integrate messages (data) with a topic using a TCP access endpoint, and provides example code.
For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.
Preparations (Check authentication information and SASL settings)
To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:
- Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
- Sub Account authentication information is used as your user name and password when setting up SASL authentication.
- Issuance and verification of the sub account Access Key: See Create Sub Account
- For Logstash, you can integrate with Data Stream through the Kafka plugin supported by Logstash. Configure the
sasl.jaas.configfile as shown in the following example:
# username: Enter Sub Account Access Key.
# password: Enter Sub Account Secret Key.
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
Data production examples
The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library.
Logstash producer
To produce data using Logstash example code:
input {
...
}
output {
kafka {
bootstrap_servers => "{TOPIC_access_endpoint}"
topic_id => "{TOPIC_NAME}"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
Data consumption examples
The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library.
Logstash consumer
To consume data using Logstash example code:
input {
kafka {
codec => plain
bootstrap_servers => "{TOPIC_access_endpoint}"
topics => ["{TOPIC_NAME}"]
group_id => "logstash-kafka-consumer"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
}
}
output {
...
}
Logstash consumer with Schema Registry
To consume data using Logstash example code when you integrate with schema registry:
schema_registry_url: Enter the schema registry to access informationschema_registry_key: Enter Sub Account Access Keyschema_registry_secret: Enter Sub Account Secret Key
input {
kafka {
bootstrap_servers => "{TOPIC_access_endpoint}"
topics => ["{TOPIC_NAME}"]
group_id => "simple-schema-consumer"
auto_offset_reset => "earliest"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='{ncp_iam_sub_access_key}' password='{ncp_iam_sub_secret_key}';"
schema_registry_url => "{SCHEMA_REGISTRY_URL}"
schema_registry_key => "{ncp_iam_sub_access_key}"
schema_registry_secret => "{ncp_iam_sub_secret_key}"
key_deserializer_class => "org.apache.kafka.common.serialization.StringDeserializer"
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
}
}
filter {
mutate {
add_field => { "saved_at" => "%{@timestamp}" }
}
}
output {
file {
path => "/tmp/schema-data-%{+YYYY.MM.dd}.json"
codec => json_lines
}
}