VPC環境で利用できます。
TCP接続エンドポイントを使用してメッセージ(データ)をトピックに送信する方法およびサンプルコードについて説明します。
注意
セキュリティのため、データ送信時にメインアカウントの認証情報を使用することはできませんので、必ずサブアカウントの認証情報を使用してください。
事前準備(認証情報の確認と SASL設定)
Kafkaクライアントアプリケーションを使用して Data Streamサービスにアクセスするには、Kafkaクライアントのタイプに合わせて SASL設定を追加する必要があります。Sub Account認証情報の確認と SASLの設定方法は、次の通りです。
- Data Streamサービスに接続するユーザーの Sub Account認証情報(Access keyと Secret key)を確認します。
- Sub Account認証情報は、SASL認証設定時にユーザー名(username)とパスワード(password)として使用されます。
- サブアカウントの Access Keyの発行と確認: サブアカウント作成を参照
- Kafkaクライアントのタイプ(Python、Java、Logstash)に合わせてデータ送受信コードに SASL設定を追加します。
- Python, Java
# username: Sub Account Access Keyを入力 # password: Sub Account Secret Keyを入力 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
データ送信のユースケース
Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを送信できます。データ送信のユースケースを紹介します。
Java producer
事前準備で確認した Sub Accountと SASL認証情報を適用した Javaサンプルコードを使用してデータを送信する方法は、次の通りです。
package com.ncp.flowlog.collector.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
public class TestingDataStreamProducerSample {
// コンソールで Topic接続エンドポイントを確認
public static final String DATA_STREAM_SERVER = "{TOPIC_アクセス_エンドポイント}";
public static final String TOPIC = "{TOPIC_NAME}";
public static void main(String[] args) {
String username = "{ncp_iam_SubAccountAccessKey}";
String password = "{ncp_iam_SubAccountSecretKey}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
try {
Producer<String, String> producer = new KafkaProducer<>(props);
String key ="sample-message-key";
String value = "message-value";
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));
if (i%10==0){
System.out.println("> flush ...");
producer.flush();
Thread.sleep(100);
}
}
producer.flush();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
Java producer with Schema Registry
スキーマレジストリを連携する場合、Javaサンプルコードを使用してデータを送信する方法は次の通りです。
- SCHEMA_REGISTRY_URL: スキーマレジストリアクセス情報を入力
- DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
/**
* SpecificRecordを実装した UserSpecificを直接使用する Producer
*
* 使用スキーマ:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamProducerSpecific {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "{TOPIC_NAME}";
// NCP IAM認証情報
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// Schema Registry設定
public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL";
public static void main(String[] args) {
Properties props = new Properties();
// Kafka Producer基本設定
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
// SASLセキュリティ設定
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
// Schema Registry設定
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // 最新スキーマ使用
// Schema Registry認証
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Producer<String, UserSpecific> producer = new KafkaProducer<>(props);
System.out.println("SpecificRecord Schema Registry Producerが開始される。Topic: " + TOPIC);
System.out.println(" UserSpecific VO直接使用:");
System.out.println(" - SpecificRecord実装で Avroを直接直列化");
System.out.println(" - スキーマクラスに組み込まれる(変換プロセス不必要)");
System.out.println("------------------------------------------------------------");
try {
// サンプルユーザーデータの作成と送信
String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
Integer[] sampleAges = {30, 25, null}; // nullは選択的フィールド
for (int i = 0; i < sampleNames.length; i++) {
try {
// UserSpecificオブジェクト作成(変換プロセスなし!)
UserSpecific user = new UserSpecific(
i + 1, // id
sampleNames[i], // name
sampleEmails[i], // email
sampleAges[i] // age (nullable)
);
String messageKey = "user-" + (i + 1);
// UserSpecificオブジェクトを直接送信
ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
TOPIC,
messageKey,
user // UserSpecific VO直接使用
);
// 同期転送(結果確認)
RecordMetadata metadata = producer.send(record).get();
System.out.printf("[%d/%d]メッセージ送信成功:%n", i + 1, sampleNames.length);
System.out.printf(" UserSpecific (直接送信): %s%n", user);
System.out.printf(" スキーマ: %s%n", user.getSchema().getName());
System.out.printf(" Topic: %s%n", metadata.topic());
System.out.printf(" Partition: %d%n", metadata.partition());
System.out.printf(" Offset: %d%n", metadata.offset());
System.out.println("--------------------------------------------------");
// メッセージ間の間隔
Thread.sleep(100);
} catch (ExecutionException | InterruptedException e) {
System.err.println("メッセージ送信エラー: " + ExceptionUtils.getRootCauseMessage(e));
}
}
System.out.println("> すべてのメッセージを送信完了!");
} catch (Exception e) {
System.err.println("Producerエラー: " + ExceptionUtils.getRootCauseMessage(e));
} finally {
try {
producer.flush();
producer.close();
System.out.println("Producer終了完了");
} catch (Exception e) {
System.err.println("Producer終了中エラー: " + e.getMessage());
}
}
}
}
- User.java
package com.naverncp.datastream.sample.kafka;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
/**
* SpecificRecordを実装する Userクラス
* Avro直接直列化が可能な Userオブジェクト
*/
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
private int id;
private String name;
private String email;
private Integer age; // nullable
// Avroスキーマ定義(static)
private static final Schema SCHEMA = new Schema.Parser().parse(
"{"
+ "\"type\": \"record\","
+ "\"name\": \"UserSpecific\","
+ "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
+ "\"fields\": ["
+ " {\"name\": \"id\", \"type\": \"int\"},"
+ " {\"name\": \"name\", \"type\": \"string\"},"
+ " {\"name\": \"email\", \"type\": \"string\"},"
+ " {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
+ "]"
+ "}"
);
// デフォルト作成者
public UserSpecific() {}
// 全体フィールド作成者
public UserSpecific(int id, String name, String email, Integer age) {
this.id = id;
this.name = name;
this.email = email;
this.age = age;
}
// SpecificRecord実装メソッド
@Override
public Schema getSchema() {
return SCHEMA;
}
@Override
public Object get(int field) {
switch (field) {
case 0: return id;
case 1: return name;
case 2: return email;
case 3: return age;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
@Override
public void put(int field, Object value) {
switch (field) {
case 0: this.id = (Integer) value; break;
case 1: this.name = (String) value; break;
case 2: this.email = (String) value; break;
case 3: this.age = (Integer) value; break;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
// Getter/Setter
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "UserSpecific{" +
"id=" + id +
", name='" + name + '\'' +
", email='" + email + '\'' +
", age=" + age +
'}';
}
}
データ受信のユースケース
Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを受信できます。データ受信のユースケースを紹介します。
Java consumer
事前準備で確認した Sub Accountと SASL認証情報を適用した Javaサンプルコードを使用してデータを受信する方法は、次の通りです。
package com.ncp.flowlog.collector.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class TestingDataStreamConsumerSample {
public static final String DATA_STREAM_SERVER = "{TOPIC_アクセス_エンドポイント}";
public static final String TOPIC = "{TOPIC_NAME}";
public static final String CONSUMER_GROUP_ID = "consumer-for-test";
public static final String username = "{ncp_iam_SubAccountAccessKey}";
public static final String password = "{ncp_iam_SubAccountSecretKey}";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String, String> record : consumerRecords) {
try {
System.out.printf("key=%s, value=%s\n", record.key(), record.value());
} catch (Exception e) {
System.err.println(ExceptionUtils.getRootCauseMessage(e));
}
}
consumer.commitSync();
try {
System.out.println("consume.....sleep");
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Java consumer with Schema Registry
スキーマレジストリを連携する場合、Javaサンプルコードを使用してデータを送信する方法は次の通りです。
- SCHEMA_REGISTRY_URL: スキーマレジストリアクセス情報を入力
package com.naverncp.datastream.sample.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
/**
* Schema Registry連携 Avro Consumer
*
* 予想スキーマ:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamConsumerSchemaRegistry {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "sample-schematopic";
public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
// NCP IAM認証情報
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// NCP Schema Registry URL
public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Consumer<String, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
try {
for (int i = 0; i < 10; i++) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}