Java

Prev Next

VPC 환경에서 이용 가능합니다.

TCP 접속 엔드포인트를 사용하여 메시지(데이터)를 토픽에 송신하는 방법과 예제 코드를 설명합니다.

주의

보안을 위해 데이터 전송 시 메인 계정의 인증 정보를 사용할 수 없으므로 반드시 서브 계정의 인증 정보를 사용해 주십시오.

사전 준비(인증 정보 확인 및 SASL 설정)

Kafka 클라이언트 애플리케이션을 사용하여 Data Stream 서비스에 접속하려면 Kafka 클라이언트 종류에 맞게 SASL 설정을 추가해야 합니다. Sub Account 인증 정보 확인 및 SASL 설정 방법은 다음과 같습니다.

  1. Data Stream 서비스에 접속할 사용자의 Sub Account 인증 정보(Access key와 Secret key)를 확인해 주십시오.
    • Sub Account 인증 정보는 SASL 인증 설정 시 사용자명(username)과 비밀번호(password)로 사용됩니다.
    • 서브 계정의 Access Key 발급 및 확인: 서브 계정 생성 참조
  2. Kafka 클라이언트 종류(Python, Java, Logstash)에 맞게 데이터 송·수신 코드에 SASL 설정을 추가해 주십시오.
    • Python, Java
      # username: Sub Account Access Key 입력
      # password: Sub Account Secret Key 입력
      
      security.protocol=SASL_SSL
      sasl.mechanism=PLAIN
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="{ncp_iam_SubAccountAccessKey}" password="{ncp_iam_SubAccountSecretKey}";
      

데이터 송신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 송신할 수 있습니다. 데이터 송신 예제를 소개합니다.

Java producer

사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

package com.ncp.flowlog.collector.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;

public class TestingDataStreamProducerSample {

    // 콘솔에서 Topic 접속 엔드포인트 확인
	public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
	public static final String TOPIC = "{TOPIC_NAME}";

	public static void main(String[] args) {
		String username = "{ncp_iam_SubAccountAccessKey}";
		String password = "{ncp_iam_SubAccountSecretKey}";

		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "local_heavy_ip_info_producer");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		try {
			Producer<String, String> producer = new KafkaProducer<>(props);

			String key ="sample-message-key";
			String value = "message-value";

			for(int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<>(TOPIC, key, value), ((recordMetadata, e) ->
					System.out.println("> meta: " + recordMetadata.toString() + ", e: " + e)));

				if (i%10==0){
					System.out.println("> flush ...");
					producer.flush();
					Thread.sleep(100);
				}
			}
			producer.flush();
		} catch (Exception e) {
			System.err.println(e.getMessage());
		}
	}
}

Java producer with Schema Registry

스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 송신하는 방법은 다음과 같습니다.

  • SCHEMA_REGISTRY_URL : 스키마 레지스트리 접속정보 입력
  • DataStreamConsumerWithSchemaRegistry
package com.naverncp.datastream.sample.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;

/**
 * SpecificRecord를 구현한 UserSpecific을 직접 사용하는 Producer
 * 
 * 사용 스키마:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamProducerSpecific {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "{TOPIC_NAME}";
	// NCP IAM 자격 증명
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// Schema Registry 설정
	public static final String SCHEMA_REGISTRY_URL = "{SCHEMA_REGISTRY_URL";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		// Kafka Producer 기본 설정
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "specific-record-producer");
		props.put(ProducerConfig.RETRIES_CONFIG, 5);
		props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
		props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
		props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
		
		// SASL 보안 설정
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		// Schema Registry 설정
		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, true);
		props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
		props.put(KafkaAvroSerializerConfig.USE_LATEST_VERSION, true); // 최신 스키마 사용		
		// Schema Registry 인증
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Producer<String, UserSpecific> producer = new KafkaProducer<>(props);

		System.out.println("SpecificRecord Schema Registry Producer 시작됨. Topic: " + TOPIC);
		System.out.println(" UserSpecific VO 직접 사용:");
		System.out.println("   - SpecificRecord 구현으로 Avro 직접 직렬화");
		System.out.println("   - 스키마가 클래스에 내장됨 (변환 과정 불필요)");
		System.out.println("------------------------------------------------------------");

		try {
			// 샘플 사용자 데이터 생성 및 전송
			String[] sampleNames = {"John Doe", "Jane Smith", "Bob Johnson"};
			String[] sampleEmails = {"john@example.com", "jane@example.com", "bob@example.com"};
			Integer[] sampleAges = {30, 25, null}; // null은 선택적 필드

			for (int i = 0; i < sampleNames.length; i++) {
				try {
					// UserSpecific 객체 생성 (변환 과정 없음!)
					UserSpecific user = new UserSpecific(
						i + 1,                // id
						sampleNames[i],       // name  
						sampleEmails[i],      // email
						sampleAges[i]         // age (nullable)
					);

					String messageKey = "user-" + (i + 1);
					
					// UserSpecific 객체를 직접 전송
					ProducerRecord<String, UserSpecific> record = new ProducerRecord<>(
						TOPIC, 
						messageKey, 
						user  // UserSpecific VO 직접 사용
					);

					// 동기 전송 (결과 확인)
					RecordMetadata metadata = producer.send(record).get();
					
					System.out.printf("[%d/%d] 메시지 전송 성공:%n", i + 1, sampleNames.length);
					System.out.printf("  UserSpecific (직접 전송): %s%n", user);
					System.out.printf("  스키마: %s%n", user.getSchema().getName());
					System.out.printf("  Topic: %s%n", metadata.topic());
					System.out.printf("  Partition: %d%n", metadata.partition());
					System.out.printf("  Offset: %d%n", metadata.offset());
					System.out.println("--------------------------------------------------");
					
					// 메시지 간 간격
					Thread.sleep(100);
					
				} catch (ExecutionException | InterruptedException e) {
					System.err.println("메시지 전송 오류: " + ExceptionUtils.getRootCauseMessage(e));
				}
			}

			System.out.println("> 모든 메시지 전송 완료!");

		} catch (Exception e) {
			System.err.println("Producer 오류: " + ExceptionUtils.getRootCauseMessage(e));
		} finally {
			try {
				producer.flush();
				producer.close();
				System.out.println("Producer 종료 완료");
			} catch (Exception e) {
				System.err.println("Producer 종료 중 오류: " + e.getMessage());
			}
		}
	}
}
  • User.java
package com.naverncp.datastream.sample.kafka;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

/**
 * SpecificRecord를 구현하는 User 클래스
 * Avro 직접 직렬화가 가능한 User 객체
 */
public class UserSpecific extends SpecificRecordBase implements SpecificRecord {
    
    private int id;
    private String name;
    private String email;
    private Integer age; // nullable
    
    // Avro 스키마 정의 (static)
    private static final Schema SCHEMA = new Schema.Parser().parse(
        "{"
        + "\"type\": \"record\","
        + "\"name\": \"UserSpecific\","
        + "\"namespace\": \"com.naverncp.datastream.sample.kafka\","
        + "\"fields\": ["
        + "  {\"name\": \"id\", \"type\": \"int\"},"
        + "  {\"name\": \"name\", \"type\": \"string\"},"
        + "  {\"name\": \"email\", \"type\": \"string\"},"
        + "  {\"name\": \"age\", \"type\": [\"null\", \"int\"], \"default\": null}"
        + "]"
        + "}"
    );
    
    // 기본 생성자
    public UserSpecific() {}
    
    // 전체 필드 생성자
    public UserSpecific(int id, String name, String email, Integer age) {
        this.id = id;
        this.name = name;
        this.email = email;
        this.age = age;
    }
    
    // SpecificRecord 구현 메서드들
    @Override
    public Schema getSchema() {
        return SCHEMA;
    }
    
    @Override
    public Object get(int field) {
        switch (field) {
            case 0: return id;
            case 1: return name;
            case 2: return email;
            case 3: return age;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    @Override
    public void put(int field, Object value) {
        switch (field) {
            case 0: this.id = (Integer) value; break;
            case 1: this.name = (String) value; break;
            case 2: this.email = (String) value; break;
            case 3: this.age = (Integer) value; break;
            default: throw new IndexOutOfBoundsException("Invalid field index: " + field);
        }
    }
    
    // Getter/Setter
    public int getId() {
        return id;
    }
    
    public void setId(int id) {
        this.id = id;
    }
    
    public String getName() {
        return name;
    }
    
    public void setName(String name) {
        this.name = name;
    }
    
    public String getEmail() {
        return email;
    }
    
    public void setEmail(String email) {
        this.email = email;
    }
    
    public Integer getAge() {
        return age;
    }
    
    public void setAge(Integer age) {
        this.age = age;
    }
    
    @Override
    public String toString() {
        return "UserSpecific{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", email='" + email + '\'' +
                ", age=" + age +
                '}';
    }
}

데이터 수신 예제

Data Stream 서비스는 Kafka Client를 지원합니다. Kafka Client 라이브러리를 활용하여 토픽에 데이터를 수신할 수 있습니다. 데이터 수신 예제를 소개합니다.

Java consumer

사전 준비에서 확인한 Sub Account 및 SASL 인증 정보를 적용한 Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

package com.ncp.flowlog.collector.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class TestingDataStreamConsumerSample {
	public static final String DATA_STREAM_SERVER = "{TOPIC_접속_엔드포인트}";
	public static final String TOPIC = "{TOPIC_NAME}";
	public static final String CONSUMER_GROUP_ID = "consumer-for-test";

	public static final String username = "{ncp_iam_SubAccountAccessKey}";
	public static final String password = "{ncp_iam_SubAccountSecretKey}";

	public static void main(String[] args) {
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");


		Consumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		while (true) {
			ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(10));
			for(ConsumerRecord<String, String> record : consumerRecords) {
				try {
					System.out.printf("key=%s, value=%s\n", record.key(), record.value());
				} catch (Exception e) {
					System.err.println(ExceptionUtils.getRootCauseMessage(e));
				}
			}

			consumer.commitSync();

			try {
				System.out.println("consume.....sleep");
				Thread.sleep(100L);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}
	}

}

Java consumer with Schema Registry

스키마 레지스트리를 연동하는 경우, Java 예제 코드를 사용하여 데이터를 수신하는 방법은 다음과 같습니다.

  • SCHEMA_REGISTRY_URL : 스키마 레지스트리 접속정보 입력
package com.naverncp.datastream.sample.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;

/**
 * Schema Registry 연동 Avro Consumer
 * 
 * 예상 스키마:
 * - id: int
 * - name: string  
 * - email: string
 * - age: nullable int
 */
public class TestingDataStreamConsumerSchemaRegistry {
	public static final String DATA_STREAM_SERVER = "{KAFKA_CLUSTER_ENDPOINT}";
	public static final String TOPIC = "sample-schematopic";
	public static final String CONSUMER_GROUP_ID = "schema-registry-consumer";
	// NCP IAM 자격 증명
	public static final String username = "{NCP_IAM_ACCESS_KEY}";
	public static final String password = "{NCP_IAM_SECRET_KEY}";
	// NCP Schema Registry URL
	public static final String SCHEMA_REGISTRY_URL = "https://schema-registry.datastream.naverncp.com";

	public static void main(String[] args) {
		Properties props = new Properties();
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DATA_STREAM_SERVER);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		props.put("security.protocol", "SASL_SSL");
		props.put("sasl.mechanism", "PLAIN");
		props.put("sasl.jaas.config",
			"org.apache.kafka.common.security.plain.PlainLoginModule required " +
				"username=\"" + username + "\" " +
				"password=\"" + password + "\";");

		props.put("schema.registry.url", SCHEMA_REGISTRY_URL);
		props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
		props.put("schema.registry.basic.auth.user.info", username + ":" + password);
		props.put("schema.registry.basic.auth.credentials.source", "USER_INFO");

		Consumer<String, Object> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList(TOPIC));

		try {
			for (int i = 0; i < 10; i++) {
				ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(2));
				
				for (ConsumerRecord<String, Object> record : records) {
					System.out.printf("Key: %s, Value: %s%n", record.key(), record.value());
				}
				
				if (!records.isEmpty()) {
					consumer.commitSync();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			consumer.close();
		}
	}
}