VPC 환경에서 이용 가능합니다.
TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽에 송신하는 방법과 예제 코드를 설명합니다.
보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.
사전 준비(인증 정보 확인 및 SASL 설정)
Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.
- Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
- Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
- 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
- Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
- Python, Java
# username: Sub Account Access Key 입력 # password: Sub Account Secret Key 입력 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
- Python, Java
데이터 송신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제를 소개합니다.
Java producer
사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
package com.ncp.flowlog.collector.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
public class TestingDataStreamProducerSample {
// 콘솔에서 Topic 접속 엔드포인트 확인
public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
public static final String TOPIC = "{TOPIC_NAME}";
public static void main(String[] args) {
String username = "{ncp_iam_SubAccountAccessKey}";
String password = "{ncp_iam_SubAccountSecretKey}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
try {
Producer<String, String> producer = new KafkaProducer<>(props);
String key ="sample-message-key";
String value = "message-value";
for(int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));
if (i%10==0){
System.out.println("> flush ...");
producer.flush();
Thread.sleep(100);
}
}
producer.flush();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
Java producer with Schema Registry
스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.
- SCHEMA_REGISTRY_URL : 스키마 레지스트리 접속정보 입력
- DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
/**
* SpecificRecord를 구현한 UserSpecific을 직접 사용하는 Producer
*
* 사용 스키마:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamProducerSpecific {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "{TOPIC_NAME}";
// NCP IAM 자격 증명
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// Schema Registry 설정
public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL";
public static void main(String[] args) {
Properties props = new Properties();
// Kafka Producer 기본 설정
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
// SASL 보안 설정
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
// Schema Registry 설정
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // 최신 스키마 사용
// Schema Registry 인증
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Producer<String, UserSpecific> producer = new KafkaProducer<>(props);
System.out.println("SpecificRecord Schema Registry Producer 시작됨. Topic: " + TOPIC);
System.out.println(" UserSpecific VO 직접 사용:");
System.out.println(" - SpecificRecord 구현으로 Avro 직접 직렬화");
System.out.println(" - 스키마가 클래스에 내장됨 (변환 과정 불필요)");
System.out.println("------------------------------------------------------------");
try {
// 샘플 사용자 데이터 생성 및 전송
String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
Integer[] sampleAges = {30, 25, null}; // null은 선택적 필드
for (int i = 0; i < sampleNames.length; i++) {
try {
// UserSpecific 객체 생성 (변환 과정 없음!)
UserSpecific user = new UserSpecific(
i + 1, // id
sampleNames[i], // name
sampleEmails[i], // email
sampleAges[i] // age (nullable)
);
String messageKey = "user-" + (i + 1);
// UserSpecific 객체를 직접 전송
ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
TOPIC,
messageKey,
user // UserSpecific VO 직접 사용
);
// 동기 전송 (결과 확인)
RecordMetadata metadata = producer.send(record).get();
System.out.printf("[%d/%d] 메시지 전송 성공:%n", i + 1, sampleNames.length);
System.out.printf(" UserSpecific (직접 전송): %s%n", user);
System.out.printf(" 스키마: %s%n", user.getSchema().getName());
System.out.printf(" Topic: %s%n", metadata.topic());
System.out.printf(" Partition: %d%n", metadata.partition());
System.out.printf(" Offset: %d%n", metadata.offset());
System.out.println("--------------------------------------------------");
// 메시지 간 간격
Thread.sleep(100);
} catch (ExecutionException | InterruptedException e) {
System.err.println("메시지 전송 오류: " + ExceptionUtils.getRootCauseMessage(e));
}
}
System.out.println("> 모든 메시지 전송 완료!");
} catch (Exception e) {
System.err.println("Producer 오류: " + ExceptionUtils.getRootCauseMessage(e));
} finally {
try {
producer.flush();
producer.close();
System.out.println("Producer 종료 완료");
} catch (Exception e) {
System.err.println("Producer 종료 중 오류: " + e.getMessage());
}
}
}
}
- User.java
package com.naverncp.datastream.sample.kafka;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
/**
* SpecificRecord를 구현하는 User 클래스
* Avro 직접 직렬화가 가능한 User 객체
*/
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
private int id;
private String name;
private String email;
private Integer age; // nullable
// Avro 스키마 정의 (static)
private static final Schema SCHEMA = new Schema.Parser().parse(
"{"
+ "\"type\": \"record\","
+ "\"name\": \"UserSpecific\","
+ "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
+ "\"fields\": ["
+ " {\"name\": \"id\", \"type\": \"int\"},"
+ " {\"name\": \"name\", \"type\": \"string\"},"
+ " {\"name\": \"email\", \"type\": \"string\"},"
+ " {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
+ "]"
+ "}"
);
// 기본 생성자
public UserSpecific() {}
// 전체 필드 생성자
public UserSpecific(int id, String name, String email, Integer age) {
this.id = id;
this.name = name;
this.email = email;
this.age = age;
}
// SpecificRecord 구현 메서드들
@Override
public Schema getSchema() {
return SCHEMA;
}
@Override
public Object get(int field) {
switch (field) {
case 0: return id;
case 1: return name;
case 2: return email;
case 3: return age;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
@Override
public void put(int field, Object value) {
switch (field) {
case 0: this.id = (Integer) value; break;
case 1: this.name = (String) value; break;
case 2: this.email = (String) value; break;
case 3: this.age = (Integer) value; break;
default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
}
}
// Getter/Setter
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "UserSpecific{" +
"id=" + id +
", name='" + name + '\'' +
", email='" + email + '\'' +
", age=" + age +
'}';
}
}
데이터 수신 예제
Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.
Java consumer
사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
package com.ncp.flowlog.collector.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class TestingDataStreamConsumerSample {
public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
public static final String TOPIC = "{TOPIC_NAME}";
public static final String CONSUMER_GROUP_ID = "consumer-for-test";
public static final String username = "{ncp_iam_SubAccountAccessKey}";
public static final String password = "{ncp_iam_SubAccountSecretKey}";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
for(ConsumerRecord<String, String> record : consumerRecords) {
try {
System.out.printf("key=%s, value=%s\n", record.key(), record.value());
} catch (Exception e) {
System.err.println(ExceptionUtils.getRootCauseMessage(e));
}
}
consumer.commitSync();
try {
System.out.println("consume.....sleep");
Thread.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Java consumer with Schema Registry
스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.
- SCHEMA_REGISTRY_URL : 스키마 레지스트리 접속정보 입력
package com.naverncp.datastream.sample.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
/**
* Schema Registry 연동 Avro Consumer
*
* 예상 스키마:
* - id: int
* - name: string
* - email: string
* - age: nullable int
*/
public class TestingDataStreamConsumerSchemaRegistry {
public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
public static final String TOPIC = "sample-schematopic";
public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
// NCP IAM 자격 증명
public static final String username = "{NCP_IAM_ACCESS_KEY}";
public static final String password = "{NCP_IAM_SECRET_KEY}";
// NCP Schema Registry URL
public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + username + "\" " +
"password=\"" + password + "\";");
props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
props.put("schema.registry.basic.auth.user.info", username + ":" + password);
props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");
Consumer<String, Object> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
try {
for (int i = 0; i < 10; i++) {
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
for (ConsumerRecord<String, Object> record : records) {
System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
}
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}