Java

Prev Next

VPC環境で利用できます。

TCP接続エンドポイントを使用してメッセージ(データ)をトピックに送信する方法およびサンプルコードについて説明します。

注意

セキュリティのため、データ送信時にメインアカウントの認証情報を使用することはできませんので、必ずサブアカウントの認証情報を使用してください。

事前準備(認証情報の確認と SASL設定)

Kafkaクライアントアプリケーションを使用して Data Streamサービスにアクセスするには、Kafkaクライアントのタイプに合わせて SASL設定を追加する必要があります。Sub Account認証情報の確認と SASLの設定方法は、次の通りです。

  1. Data Streamサービスに接続するユーザーの Sub Account認証情報(Access keyと Secret key)を確認します。
    • Sub Account認証情報は、SASL認証設定時にユーザー名(username)とパスワード(password)として使用されます。
    • サブアカウントの Access Keyの発行と確認: サブアカウント作成を参照
  2. Kafkaクライアントのタイプ(Python、Java、Logstash)に合わせてデータ送受信コードに SASL設定を追加します。
    • Python, Java
      # username: Sub Account Access Keyを入力
      # password: Sub Account Secret Keyを入力
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

データ送信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを送信できます。データ送信のユースケースを紹介します。

Java producer

事前準備で確認した Sub Accountと SASL認証情報を適用した Javaサンプルコードを使用してデータを送信する方法は、次の通りです。

package com.ncp.flowlog.collector.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestingDataStreamProducerSample {

    // コンソールで Topic接続エンドポイントを確認
	public static final String DATA_STREAM_SERVER = "{TOPIC_アクセス_エンドポイント}";
	public static final String TOPIC = "{TOPIC_NAME}";

	public static void main(String[] args) {
		String username = "{ncp_iam_SubAccountAccessKey}";
		String password = "{ncp_iam_SubAccountSecretKey}";

		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		try {
			Producer<String, String> producer = new KafkaProducer<>(props);

			String key ="sample-message-key";
			String value = "message-value";

			for(int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
					System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));

				if (i%10==0){
					System.out.println("> flush ...");
					producer.flush();
					Thread.sleep(100);
				}
			}
			producer.flush();
		} catch (Exception e) {
			System.err.println(e.getMessage());
		}
	}
}

Java producer with Schema Registry

スキーマレジストリを連携する場合、Javaサンプルコードを使用してデータを送信する方法は次の通りです。

  • SCHEMA_REGISTRY_URL: スキーマレジストリアクセス情報を入力
  • DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;

/**
 * SpecificRecordを実装した UserSpecificを直接使用する Producer
 * 
 * 使用スキーマ:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamProducerSpecific {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "{TOPIC_NAME}";
	// NCP IAM認証情報
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// Schema Registry設定
	public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		// Kafka Producer基本設定
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
		props.put(ProducerConfig.RETRIES_CONFIG, 5);
		props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
		props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
		props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
		
		// SASLセキュリティ設定
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		// Schema Registry設定
		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
		props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
		props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // 最新スキーマ使用		
		// Schema Registry認証
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Producer<String, UserSpecific> producer = new KafkaProducer<>(props);

		System.out.println("SpecificRecord Schema Registry Producerが開始される。Topic: " + TOPIC);
		System.out.println(" UserSpecific VO直接使用:");
		System.out.println("   - SpecificRecord実装で Avroを直接直列化");
		System.out.println("   - スキーマクラスに組み込まれる(変換プロセス不必要)");
		System.out.println("------------------------------------------------------------");

		try {
			// サンプルユーザーデータの作成と送信
			String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
			String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
			Integer[] sampleAges = {30, 25, null}; // nullは選択的フィールド

			for (int i = 0; i < sampleNames.length; i++) {
				try {
					// UserSpecificオブジェクト作成(変換プロセスなし!)
					UserSpecific user = new UserSpecific(
						i + 1,                // id
						sampleNames[i],       // name  
						sampleEmails[i],      // email
						sampleAges[i]         // age (nullable)
					);

					String messageKey = "user-" + (i + 1);
					
					// UserSpecificオブジェクトを直接送信
					ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
						TOPIC, 
						messageKey, 
						user  // UserSpecific VO直接使用
					);

					// 同期転送(結果確認)
					RecordMetadata metadata = producer.send(record).get();
					
					System.out.printf("[%d/%d]メッセージ送信成功:%n", i + 1, sampleNames.length);
					System.out.printf("  UserSpecific (直接送信): %s%n", user);
					System.out.printf("  スキーマ: %s%n", user.getSchema().getName());
					System.out.printf("  Topic: %s%n", metadata.topic());
					System.out.printf("  Partition: %d%n", metadata.partition());
					System.out.printf("  Offset: %d%n", metadata.offset());
					System.out.println("--------------------------------------------------");
					
					// メッセージ間の間隔
					Thread.sleep(100);
					
				} catch (ExecutionException | InterruptedException e) {
					System.err.println("メッセージ送信エラー: " + ExceptionUtils.getRootCauseMessage(e));
				}
			}

			System.out.println("> すべてのメッセージを送信完了!");

		} catch (Exception e) {
			System.err.println("Producerエラー: " + ExceptionUtils.getRootCauseMessage(e));
		} finally {
			try {
				producer.flush();
				producer.close();
				System.out.println("Producer終了完了");
			} catch (Exception e) {
				System.err.println("Producer終了中エラー: " + e.getMessage());
			}
		}
	}
}
  • User.java
package com.naverncp.datastream.sample.kafka;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

/**
 * SpecificRecordを実装する Userクラス
 * Avro直接直列化が可能な Userオブジェクト
 */
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
    
    private int id;
    private String name;
    private String email;
    private Integer age; // nullable
    
    // Avroスキーマ定義(static)
    private static final Schema SCHEMA = new Schema.Parser().parse(
        "{"
        + "\"type\": \"record\","
        + "\"name\": \"UserSpecific\","
        + "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
        + "\"fields\": ["
        + "  {\"name\": \"id\", \"type\": \"int\"},"
        + "  {\"name\": \"name\", \"type\": \"string\"},"
        + "  {\"name\": \"email\", \"type\": \"string\"},"
        + "  {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
        + "]"
        + "}"
    );
    
    // デフォルト作成者
    public UserSpecific() {}
    
    // 全体フィールド作成者
    public UserSpecific(int id, String name, String email, Integer age) {
        this.id = id;
        this.name = name;
        this.email = email;
        this.age = age;
    }
    
    // SpecificRecord実装メソッド
    @Override
    public Schema getSchema() {
        return SCHEMA;
    }
    
    @Override
    public Object get(int field) {
        switch (field) {
            case 0: return id;
            case 1: return name;
            case 2: return email;
            case 3: return age;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    @Override
    public void put(int field, Object value) {
        switch (field) {
            case 0: this.id = (Integer) value; break;
            case 1: this.name = (String) value; break;
            case 2: this.email = (String) value; break;
            case 3: this.age = (Integer) value; break;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    // Getter/Setter
    public int getId() {
        return id;
    }
    
    public void setId(int id) {
        this.id = id;
    }
    
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    public String getEmail() {
        return email;
    }
    
    public void setEmail(String email) {
        this.email = email;
    }
    
    public Integer getAge() {
        return age;
    }
    
    public void setAge(Integer age) {
        this.age = age;
    }
    
    @Override
    public String toString() {
        return "UserSpecific{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", email='" + email + '\'' +
                ", age=" + age +
                '}';
    }
}

データ受信のユースケース

Data Streamサービスは Kafka Clientをサポートします。Kafka Clientライブラリを活用してトピックにデータを受信できます。データ受信のユースケースを紹介します。

Java consumer

事前準備で確認した Sub Accountと SASL認証情報を適用した Javaサンプルコードを使用してデータを受信する方法は、次の通りです。

package com.ncp.flowlog.collector.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class TestingDataStreamConsumerSample {
	public static final String DATA_STREAM_SERVER = "{TOPIC_アクセス_エンドポイント}";
	public static final String TOPIC = "{TOPIC_NAME}";
	public static final String CONSUMER_GROUP_ID = "consumer-for-test";

	public static final String username = "{ncp_iam_SubAccountAccessKey}";
	public static final String password = "{ncp_iam_SubAccountSecretKey}";

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");


		Consumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		while (true) {
			ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
			for(ConsumerRecord<String, String> record : consumerRecords) {
				try {
					System.out.printf("key=%s, value=%s\n", record.key(), record.value());
				} catch (Exception e) {
					System.err.println(ExceptionUtils.getRootCauseMessage(e));
				}
			}

			consumer.commitSync();

			try {
				System.out.println("consume.....sleep");
				Thread.sleep(100L);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

}

Java consumer with Schema Registry

スキーマレジストリを連携する場合、Javaサンプルコードを使用してデータを送信する方法は次の通りです。

  • SCHEMA_REGISTRY_URL: スキーマレジストリアクセス情報を入力
package com.naverncp.datastream.sample.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;

/**
 * Schema Registry連携 Avro Consumer
 * 
 * 予想スキーマ:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamConsumerSchemaRegistry {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "sample-schematopic";
	public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
	// NCP IAM認証情報
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// NCP Schema Registry URL
	public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Consumer<String, Object> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		try {
			for (int i = 0; i < 10; i++) {
				ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
				
				for (ConsumerRecord<String, Object> record : records) {
					System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
				}
				
				if (!records.isEmpty()) {
					consumer.commitSync();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			consumer.close();
		}
	}
}