Available in VPC
This article describes how to produce messages (data) in a topic using a TCP access endpoint, and provides example code.
For security reasons, you cannot use your main account's authentication information when transferring data, so be sure to use the sub account's authentication information.
Preparations (Check authentication information and SASL settings)
To access the Data Stream service using a Kafka client application, SASL settings appropriate to the Kafka client type must be added. To check Sub Account authentication information and configure SASL:
- Check the Sub Account authentication information (Access Key and Secret Key) of the user who will access the Data Stream service.
- Sub Account authentication information is used as your user name and password when setting up SASL authentication.
- For more information about how to issue and check sub account Access Keys, see Create sub account.
- Add SASL settings to the data production/consumption code according to the Kafka client type (Python, Java, or Logstash).
- Python, Java
# username: Enter Sub Account Access Key. # password: Enter Sub Account Secret Key. security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
Data production example
The Data Stream service supports Kafka Client. You can produce data into a topic by utilizing the Kafka Client library. The following is an example of data production.
Java producer
To produce data using Java example code that applies the Sub Account and SASL authentication information verified during the preparation stage:
package com.ncp.flowlog.collector.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
public class TestingDataStreamProducerSample {
// Check the topic access endpoint in the console
public static final String DATA_STREAM_SERVER = "{TOPIC_access_endpoint}";
public static final String TOPIC = "{TOPIC_NAME}";
public static void main(String[] args) {
String username = "{ncp_iam_SubAccountAccessKey}";
String password = "{ncp_iam_SubAccountSecretKey}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
try {
Producer<String, String> producer = new KafkaProducer<>(props);
String key ="sample-message-key";
String value = "message-value";
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));
if (i%10==0){
System.out.println("> flush ...");
producer.flush();
Thread.sleep(100);
}
}
producer.flush();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
Java producer with Schema Registry
To produce data using Java example code when you integrate with schema registry:
- SCHEMA_REGISTRY_URL: Enter the schema registry access information.
- DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
/**
* A producer that directly uses UserSpecific implementing SpecificRecord
*
* Schemas to use:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamProducerSpecific {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "{TOPIC_NAME}";
// NCP IAM credentials
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// Schema Registry settings
public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL";
public static void main(String[] args) {
Properties props = new Properties();
// Basic Kafka Producer settings
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
// SASL security settings
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
// Schema Registry settings
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // use the latest schema
// Schema Registry credentials
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Producer<String, UserSpecific> producer = new KafkaProducer<>(props);
System.out.println("SpecificRecord Schema Registry Producer started. Topic: " + TOPIC);
System.out.println("directly use UserSpecific VO:");
System.out.println(" - Avro direct serialization with SpecificRecord implementation");
System.out.println(" - the schema is embedded in the class (no conversion process required)");
System.out.println("------------------------------------------------------------");
try {
// Create and transfer sample user data
String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
Integer[] sampleAges = {30, 25, null}; // null is an optional field
for (int i = 0; i < sampleNames.length; i++) {
try {
// Create UserSpecific object (no conversion process!)
UserSpecific user = new UserSpecific(
i + 1, // id
sampleNames[i], // name
sampleEmails[i], // email
sampleAges[i] // age (nullable)
);
String messageKey = "user-" + (i + 1);
// Directly transfer UserSpecific object
ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
TOPIC,
messageKey,
user // directly use UserSpecific VO
);
// Synchronous transfer (view results)
RecordMetadata metadata = producer.send(record).get();
System.out.printf("[%d/%d] message transfer succeeded:%n", i + 1, sampleNames.length);
System.out.printf(" UserSpecific (direct transfer): %s%n", user);
System.out.printf(" schema: %s%n", user.getSchema().getName());
System.out.printf(" Topic: %s%n", metadata.topic());
System.out.printf(" Partition: %d%n", metadata.partition());
System.out.printf(" Offset: %d%n", metadata.offset());
System.out.println("--------------------------------------------------");
// Interval between messages
Thread.sleep(100);
} catch (ExecutionException | InterruptedException e) {
System.err.println("message transfer error: " + ExceptionUtils.getRootCauseMessage(e));
}
}
System.out.println("> all messages have been transferred!");
} catch (Exception e) {
System.err.println("Producer error: " + ExceptionUtils.getRootCauseMessage(e));
} finally {
try {
producer.flush();
producer.close();
System.out.println("producer ended successfully");
} catch (Exception e) {
System.err.println("error while ending producer: " + e.getMessage());
}
}
}
}
- User.java
package com.naverncp.datastream.sample.kafka;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
/**
* User class that implements SpecificRecord
* User object where Avro direct serialization is available
*/
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
private int id;
private String name;
private String email;
private Integer age; // nullable
// Define Avro schema (static)
private static final Schema SCHEMA = new Schema.Parser().parse(
"{"
+ "\"type\": \"record\","
+ "\"name\": \"UserSpecific\","
+ "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
+ "\"fields\": ["
+ " {\"name\": \"id\", \"type\": \"int\"},"
+ " {\"name\": \"name\", \"type\": \"string\"},"
+ " {\"name\": \"email\", \"type\": \"string\"},"
+ " {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
+ "]"
+ "}"
);
// Default constructor
public UserSpecific() {}
// Constructor with all fields
public UserSpecific(int id, String name, String email, Integer age) {
this.id = id;
this.name = name;
this.email = email;
this.age = age;
}
// Methods that implement SpecificRecord
@Override
public Schema getSchema() {
return SCHEMA;
}
@Override
public Object get(int field) {
switch (field) {
case 0: return id;
case 1: return name;
case 2: return email;
case 3: return age;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
@Override
public void put(int field, Object value) {
switch (field) {
case 0: this.id = (Integer) value; break;
case 1: this.name = (String) value; break;
case 2: this.email = (String) value; break;
case 3: this.age = (Integer) value; break;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
// Getter/Setter
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "UserSpecific{" +
"id=" + id +
", name='" + name + '\'' +
", email='" + email + '\'' +
", age=" + age +
'}';
}
}
Data consumption example
The Data Stream service supports Kafka Client. You can consume data into a topic by utilizing the Kafka Client library. The following is an example of data consumption.
Java consumer
To consume data using Java example code that applies the Sub Account and SASL authentication information verified during the preparation stage:
package com.ncp.flowlog.collector.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class TestingDataStreamConsumerSample {
public static final String DATA_STREAM_SERVER = "{TOPIC_access_endpoint}";
public static final String TOPIC = "{TOPIC_NAME}";
public static final String CONSUMER_GROUP_ID = "consumer-for-test";
public static final String username = "{ncp_iam_SubAccountAccessKey}";
public static final String password = "{ncp_iam_SubAccountSecretKey}";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String, String> record : consumerRecords) {
try {
System.out.printf("key=%s, value=%s\n", record.key(), record.value());
} catch (Exception e) {
System.err.println(ExceptionUtils.getRootCauseMessage(e));
}
}
consumer.commitSync();
try {
System.out.println("consume.....sleep");
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Java consumer with Schema Registry
To consume data using Java example code when you integrate with schema registry:
- SCHEMA_REGISTRY_URL: Enter the schema registry access information.
package com.naverncp.datastream.sample.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
/**
* Schema Registry integration Avro Consumer
*
* Expected schema:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamConsumerSchemaRegistry {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "sample-schematopic";
public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
// NCP IAM credentials
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// NCP Schema Registry URL
public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Consumer<String, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
try {
for (int i = 0; i < 10; i++) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}