Documentation Index

Fetch the complete documentation index at: https://guide.ncloud-docs.com/llms.txt

Use this file to discover all available pages before exploring further.

Java

Prev Next

VPC 환경에서 이용 가능합니다.

TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽에 송신하는 방법과 예제 코드를 설명합니다.

주의

보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.

사전 준비(인증 정보 확인 및 SASL 설정)

Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.

  1. Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
    • Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
    • 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
  2. Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
    • Python, Java
      # username: Sub Account Access Key 입력
      # password: Sub Account Secret Key 입력
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

데이터 송신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제는 다음과 같습니다.

Java producer

사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

package com.ncp.flowlog.collector.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestingDataStreamProducerSample {

    // 콘솔에서 Topic 접속 엔드포인트 확인
	public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
	public static final String TOPIC = "{TOPIC_NAME}";

	public static void main(String[] args) {
		String username = "{ncp_iam_SubAccountAccessKey}";
		String password = "{ncp_iam_SubAccountSecretKey}";

		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		try {
			Producer<String, String> producer = new KafkaProducer<>(props);

			String key ="sample-message-key";
			String value = "message-value";

			for(int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
					System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));

				if (i%10==0){
					System.out.println("> flush ...");
					producer.flush();
					Thread.sleep(100);
				}
			}
			producer.flush();
		} catch (Exception e) {
			System.err.println(e.getMessage());
		}
	}
}

Java producer with Schema Registry

스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  • SCHEMA_REGISTRY_URL: 스키마 레지스트리 접속 정보 입력
  • DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;

/**
 * SpecificRecord를 구현한 UserSpecific을 직접 사용하는 Producer
 * 
 * 사용 스키마:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamProducerSpecific {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "{TOPIC_NAME}";
	// NCP IAM 자격 증명
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// Schema Registry 설정
	public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL}";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		// Kafka Producer 기본 설정
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
		props.put(ProducerConfig.RETRIES_CONFIG, 5);
		props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
		props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
		props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
		
		// SASL 보안 설정
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		// Schema Registry 설정
		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
		props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
		props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // 최신 스키마 사용		
		// Schema Registry 인증
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Producer<String, UserSpecific> producer = new KafkaProducer<>(props);

		System.out.println("SpecificRecord Schema Registry Producer 시작됨. Topic: " + TOPIC);
		System.out.println(" UserSpecific VO 직접 사용:");
		System.out.println("   - SpecificRecord 구현으로 Avro 직접 직렬화");
		System.out.println("   - 스키마가 클래스에 내장됨 (변환 과정 불필요)");
		System.out.println("------------------------------------------------------------");

		try {
			// 샘플 사용자 데이터 생성 및 전송
			String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
			String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
			Integer[] sampleAges = {30, 25, null}; // null은 선택적 필드

			for (int i = 0; i < sampleNames.length; i++) {
				try {
					// UserSpecific 객체 생성 (변환 과정 없음!)
					UserSpecific user = new UserSpecific(
						i + 1,                // id
						sampleNames[i],       // name  
						sampleEmails[i],      // email
						sampleAges[i]         // age (nullable)
					);

					String messageKey = "user-" + (i + 1);
					
					// UserSpecific 객체를 직접 전송
					ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
						TOPIC, 
						messageKey, 
						user  // UserSpecific VO 직접 사용
					);

					// 동기 전송 (결과 확인)
					RecordMetadata metadata = producer.send(record).get();
					
					System.out.printf("[%d/%d] 메시지 전송 성공:%n", i + 1, sampleNames.length);
					System.out.printf("  UserSpecific (직접 전송): %s%n", user);
					System.out.printf("  스키마: %s%n", user.getSchema().getName());
					System.out.printf("  Topic: %s%n", metadata.topic());
					System.out.printf("  Partition: %d%n", metadata.partition());
					System.out.printf("  Offset: %d%n", metadata.offset());
					System.out.println("--------------------------------------------------");
					
					// 메시지 간 간격
					Thread.sleep(100);
					
				} catch (ExecutionException | InterruptedException e) {
					System.err.println("메시지 전송 오류: " + ExceptionUtils.getRootCauseMessage(e));
				}
			}

			System.out.println("> 모든 메시지 전송 완료!");

		} catch (Exception e) {
			System.err.println("Producer 오류: " + ExceptionUtils.getRootCauseMessage(e));
		} finally {
			try {
				producer.flush();
				producer.close();
				System.out.println("Producer 종료 완료");
			} catch (Exception e) {
				System.err.println("Producer 종료 중 오류: " + e.getMessage());
			}
		}
	}
}
  • User.java
package com.naverncp.datastream.sample.kafka;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

/**
 * SpecificRecord를 구현하는 User 클래스
 * Avro 직접 직렬화가 가능한 User 객체
 */
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
    
    private int id;
    private String name;
    private String email;
    private Integer age; // nullable
    
    // Avro 스키마 정의 (static)
    private static final Schema SCHEMA = new Schema.Parser().parse(
        "{"
        + "\"type\": \"record\","
        + "\"name\": \"UserSpecific\","
        + "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
        + "\"fields\": ["
        + "  {\"name\": \"id\", \"type\": \"int\"},"
        + "  {\"name\": \"name\", \"type\": \"string\"},"
        + "  {\"name\": \"email\", \"type\": \"string\"},"
        + "  {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
        + "]"
        + "}"
    );
    
    // 기본 생성자
    public UserSpecific() {}
    
    // 전체 필드 생성자
    public UserSpecific(int id, String name, String email, Integer age) {
        this.id = id;
        this.name = name;
        this.email = email;
        this.age = age;
    }
    
    // SpecificRecord 구현 메서드들
    @Override
    public Schema getSchema() {
        return SCHEMA;
    }
    
    @Override
    public Object get(int field) {
        switch (field) {
            case 0: return id;
            case 1: return name;
            case 2: return email;
            case 3: return age;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    @Override
    public void put(int field, Object value) {
        switch (field) {
            case 0: this.id = (Integer) value; break;
            case 1: this.name = (String) value; break;
            case 2: this.email = (String) value; break;
            case 3: this.age = (Integer) value; break;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    // Getter/Setter
    public int getId() {
        return id;
    }
    
    public void setId(int id) {
        this.id = id;
    }
    
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    public String getEmail() {
        return email;
    }
    
    public void setEmail(String email) {
        this.email = email;
    }
    
    public Integer getAge() {
        return age;
    }
    
    public void setAge(Integer age) {
        this.age = age;
    }
    
    @Override
    public String toString() {
        return "UserSpecific{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", email='" + email + '\'' +
                ", age=" + age +
                '}';
    }
}

데이터 수신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제는 다음과 같습니다.

Java consumer

사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

package com.ncp.flowlog.collector.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class TestingDataStreamConsumerSample {
	public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
	public static final String TOPIC = "{TOPIC_NAME}";
	public static final String CONSUMER_GROUP_ID = "consumer-for-test";

	public static final String username = "{ncp_iam_SubAccountAccessKey}";
	public static final String password = "{ncp_iam_SubAccountSecretKey}";

	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");


		Consumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		while (true) {
			ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
			for(ConsumerRecord<String, String> record : consumerRecords) {
				try {
					System.out.printf("key=%s, value=%s\n", record.key(), record.value());
				} catch (Exception e) {
					System.err.println(ExceptionUtils.getRootCauseMessage(e));
				}
			}

			consumer.commitSync();

			try {
				System.out.println("consume.....sleep");
				Thread.sleep(100L);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

}

Java consumer with Schema Registry

스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

  • SCHEMA_REGISTRY_URL: 스키마 레지스트리 접속 정보 입력
package com.naverncp.datastream.sample.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;

/**
 * Schema Registry 연동 Avro Consumer
 * 
 * 예상 스키마:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamConsumerSchemaRegistry {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "sample-schematopic";
	public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
	// NCP IAM 자격 증명
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// NCP Schema Registry URL
	public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Consumer<String, Object> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		try {
			for (int i = 0; i < 10; i++) {
				ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
				
				for (ConsumerRecord<String, Object> record : records) {
					System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
				}
				
				if (!records.isEmpty()) {
					consumer.commitSync();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			consumer.close();
		}
	}
}