Cloud Data Streaming Service 활용
    • PDF

    Cloud Data Streaming Service 활용

    • PDF

    Article Summary

    VPC 환경에서 이용 가능합니다.

    CMAK에 접속하여 Topic을 생성하고 Topic에 데이터를 저장하는 방법, Topic에 저장된 데이터를 Cloud Data Streaming Service Cluster에 전송하는 방법과 Apache Kafka 클라이언트와 Apache Kafka의 Broker 노드 간 통신 구간을 암호화하는 방법을 설명합니다.

    CMAK 접속

    CMAK는 Apache Kafka 클러스터를 관리하는 플랫폼입니다. CMAK에 접속하여 Topic을 생성하고 Topic의 Partition 개수를 변경하거나 Topic의 데이터 보관 주기를 변경할 수 있습니다.

    참고

    CMAK에 접속하기 전에 Public 도메인을 활성화해야 합니다.

    CMAK에 접속하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
    2. 클러스터의 체크 박스를 클릭한 후 클러스터 관리 > CMAK 접속 도메인 설정 변경 버튼을 클릭해 주십시오.
    3. 팝업 창이 나타나면 정보를 확인한 후 [확인] 버튼을 클릭해 주십시오.
      • Public 도메인이 활성화됩니다.
    4. 클러스터의 체크 박스를 클릭한 후 클러스터 관리 > CMAK 접속을 클릭해 주십시오.
      • CMAK 접속 메뉴가 활성화되지 않으면 Public 도메인이 활성화되지 않은 것입니다. 1~3의 절차를 다시 수행해 주십시오.
    5. CMAK 접속을 위한 사전 작업 창이 나타나면 CMAK 바로가기 버튼을 클릭해 주십시오.
      cdss-cmak_cmak01_ko
    6. 로그인 팝업 창이 나타나면 클러스터 생성 시 설정한 사용자 ID와 Password를 입력해 주십시오.
      • ID를 잊어버린 경우, 클러스터 관리 > CMAK 접속 비밀번호 초기화를 클릭하면 ID를 확인할 수 있습니다.
    7. 클러스터 계정 이름을 클릭해 주십시오.
      cdss-cmak_cmak02_ko
    8. Cluster Summary에서 Topic과 Broker 노드에 대한 정보를 확인해 주십시오.
      cdss-cmak_cmak03_ko
    • Topic에 대한 정보
      cmak-04_ko

    • Broker 노드에 대한 정보
      cmak-05_ko

    Topic 생성

    1. CMAK 페이지에 접속해 주십시오.
    2. CMAK 페이지에서 Topic > Create 메뉴를 클릭해 주십시오.
      cdss-cmak_topicadd01_ko
    3. 토픽 정보를 입력해 주십시오.
      cdss-cmak_topicadd02_ko
      • Topic: Topic의 이름 입력
      • Partitions: Topic의 Partitions 개수 입력
      • Replication Factor: Topic의 Replication 개수 입력
    4. [Create] 버튼을 클릭해 주십시오.
    참고

    상세 설정 값에 대한 설명은 Apach Kafka Documentation을 참조해 주십시오.

    주의
    • Topic을 생성한 이후에는 Partitions 개수는 늘리는 것만 가능하며 이미 추가된 Partitions 개수를 줄이는 것을 불가능합니다.
    • 데이터 Consume 시 순서 보장이 필요한 경우, Partitions 개수는 1로 설정해야 합니다. 단, Partitions이 1개인 경우 하나의 브로커 노드에 모든 데이터가 저장되므로 브로커 노드의 Disk 사용량 관리에 주의해야 합니다.
    • Topic을 생성한 이후에는 Replication 개수를 변경할 수 없습니다. 안정적인 서비스를 운영하기 위해 최소 2개 이상의 Replication를 적용하는 것을 권장합니다.

    Topic 정보 확인

    cdss-cmak_topicinfo

    영역설명
    Topic SummaryTopic의 정보 확인
    OperationsTopic 삭제, Partition 추가, 재분배, 설정 변경
    Partitions by Broker각 브로커 노드에 대한 생성된 Partition 정보 확인
    Partition Information각 Partition의 Leader와 Replication이 어떤 브로커 노드에 위치하는지 확인

    Topic의 데이터 보관 주기 변경

    Topic의 데이터 보관 주기를 조정하는 방법은 다음과 같습니다.

    1. CMAK 접속을 참조하여 CMAK에 접속해 주십시오.
    2. 클러스터 이름을 클릭해 주십시오.
    3. Topic > List를 클릭해 주십시오.
      cdss-cmak_topiclist_ko
    4. Topic 목록에서 데이터 보관주기를 변경할 Topic의 이름을 클릭해 주십시오.
    5. Operations > [Update Config] 버튼을 클릭해 주십시오.
      cdss-cmak_topicconfig01_ko
    6. retention.ms 값을 수정해 주십시오.
      cdss-cmak_topicconfig02_ko
      • 1시간으로 설정하고 싶은 경우, 3600000 값을 입력합니다.
      • n시간으로 설정하고 싶은 경우, 3600000 * n의 결과값을 입력합니다.
    7. [Update Config] 버튼을 클릭해 주십시오.

    Topic의 Partition 개수 변경

    CMAK에서 생성된 Topic의 Partition 개수를 늘릴 수 있습니다. Partition은 추가하는 것만 가능하며 기존에 할당된 Partition은 삭제할 수 없습니다.

    Topic의 Partition 개수를 늘리는 방법은 다음과 같습니다.

    1. CMAK 접속을 참조하여 CMAK에 접속해 주십시오.
    2. 클러스터 이름을 클릭해 주십시오.
    3. Topic > List를 클릭해 주십시오.
      cdss-cmak_topiclist_ko
    4. Topic 목록에서 데이터 보관주기를 변경할 Topic의 이름을 클릭해 주십시오.
    5. Operations > [Add Partitions] 버튼을 클릭해 주십시오.
      cdss-cmak_parition01_ko
    6. Partitions 항목에 Partition 개수를 입력해 주십시오.
      • 입력된 수보다 큰 수만 입력할 수 있습니다.
        cdss-cmak_parition02_ko
    7. [Add Paritions] 버튼을 클릭해 주십시오.

    Producer/Consumer 구현

    다음과 같은 구조로 Producer VM, Consumer VM을 생성하여 Topic에 데이터를 저장하고 전송하는 방법을 설명합니다.

    Apache Kafka, Java, Python을 활용하여 데이터를 전송하고 저장하는 방법을 설명합니다.

    cdss-2-23

    참고

    이 가이드는 CentOS 7.3을 기준으로 설명합니다.

    Producer VM, Consumer VM 생성

    Producer VM과 Consumer VM을 생성하는 방법은 다음과 같습니다.

    1. Server 시작을 참조하여 서버를 생성해 주십시오.
    2. 서버 생성 시 Cloud Data Streaming Service 클러스터를 생성한 VPC와 동일한 VPC를 선택해 주십시오.
      cdss-cmak_server_ko
    3. Subnet은 Public Subnet을 선택해 주십시오.
      • 해당 VM에 공인 IP를 할당해야 접속할 수 있습니다.

    Java 설치

    다음 명령어를 입력하여 Java를 설치해 주십시오.

    yum install java -y
    

    Apache Kafka 설치

    다음 명령어를 입력하여 Apache Kafka를 설치해 주십시오.

    wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz
    # 압축을 풀어줍니다.
    tar -zxvf kafka_2.12-2.4.0.tgz
    

    Broker 노드 정보 확인

    Broker 노드 정보를 확인하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
    2. 클러스터 상세 정보에서 Broker 노드 정보의 [상세 보기] 버튼을 클릭해 주십시오.
      cdss-cmak_broker01_ko
    3. Broker 노드 정보 창이 나타나면 정보를 확인해 주십시오.
      cdss-cmak_broker02_ko
      • PlainText: Broker 노드와 암호화 없이 통신하기 위한 정보
      • TLS: Broker 노드와 암호화 통신하기 위한 정보
      • hosts 파일 정보: 암호화 통신에 이용되는 hosts 파일 수정 시 필요한 정보

    Broker 노드의 ACG 설정

    Broker 노드의 ACG 규칙을 설정하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
    2. 클러스터 상세 정보에서 Broker 노드 ACG의 cdss-cluster-popupicon을 클릭해 주십시오.
    3. ACG 목록에서 Broker 노드의 ACG를 선택한 후 [설정] 버튼을 클릭해 주십시오.
    4. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
      cdss-cmak_acg01_ko
      • 프로토콜: TCP
      • 접근 소스: Producer VM, Consumer VM의 비공인 IP 입력
      • 허용 포트: 9092-9093 입력
      • ACG에 대한 메모 입력
    5. 규칙이 추가되었는지 확인한 후 [적용] 버튼을 클릭해 주십시오.
      • 해당 규칙이 ACG에 적용됩니다.

    Apache Kafka 활용하여 데이터 전송

    1. 이전 단계에서 생성한 Producer VM에 접속한 후 다음 명령어를 입력해 주십시오.

      cd kafka_2.12-2.4.0
      ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic]
      # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
      # [topic]에 CMAK에서 생성한 Topic을 입력합니다.
      # 예시) ./bin/kafka-console-producer.sh --broker-list 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test
      

      cdss-2-29_ko

    2. 전송하고 싶은 메시지를 입력해 주십시오.

      • Broker 노드에 해당 메시지가 저장됩니다.
      • 종료를 원할 경우, [Ctrl + C] 키를 눌러 주십시오.
        cdss-2-30_ko
    3. 이전 단계에서 생성한 Consumer VM에 접속한 후 아래 명령어를 실행해 주십시오.

      • --from-beginning 명령어를 사용할 경우, 해당 Topic에 대한 데이터를 처음부터 모두 조회합니다.
      • --from-beginning 명령어를 사용하지 않을 경우, 데이터를 조회한 순간부터 입력되는 데이터만 조회합니다.
      cd kafka_2.12-2.4.0
      ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --from-beginning
      # [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
      # [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
      # 예시) ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092 --topic test --from-beginning
      

    Java 활용하여 데이터 전송

    Java를 활용하여 데이터를 전송하는 방법을 설명합니다.

    참고

    이 가이드는 IntelliJ IDEA를 사용하는 것을 기준으로 설명합니다.

    프로젝트 생성

    프로젝트를 생성하는 방법은 다음과 같습니다.

    1. IntelliJ IDEA를 실행한 후, File > New > Project를 클릭해 주십시오.
    2. Maven Archetype을 선택한 후 Project 정보를 입력하고 [Create] 버튼을 클릭하여 프로젝트를 생성해 주십시오.
      java-1_ko

    pom.xml 파일 수정

    프로젝트 종속성, Java 버전, 패키징 메서드를 정의하는 pom.xml 파일을 수정해 주십시오.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <groupId>org.example</groupId>
        <artifactId>maventest</artifactId>
        <version>1.0-SNAPSHOT</version>
        <url>http://maven.apache.org</url>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <!-- Apache Kafka version in Cloud Data Streaming Service -->
                <version>2.4.0</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.21</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.3</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>KafkaMain</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
    
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    참고

    사용자 환경에 따라 pom.xml 파일이 상이할 수 있습니다.

    KafkaMain.java 작성

    Java Application 실행 시 argument로 produce/consume 여부, Topic, Broker lists를 전달합니다.

    public class KafkaMain {
        public static void main(String[] args) throws IOException {
            String topicName = args[1];
            String brokers = args[2];
    
            switch(args[0]){
                case "produce":
                    Producer.produce(brokers, topicName);
                    break;
                case "consume":
                    Consumer.consume(brokers, topicName);
                    break;
                default:
                    System.out.println("Wrong arguments");
                    break;
            }
            System.exit(0);
        }
    }
    

    Producer.java 작성

    Producer.java 파일을 작성해 주십시오. 0~99의 숫자를 전송하는 예시는 다음과 같습니다.

    public class Producer {
        public static void produce(String brokers, String topicName) throws IOException {
            // Create Producer
            KafkaProducer<String, String> producer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer<>(properties);
    
            for(int i=0;i<100;i++){
                ProducerRecord record = new ProducerRecord<String, String>(topicName, Integer.toString(i));
                producer.send(record);
            }
            producer.close();
        }
    }
    

    Consumer.java 작성

    Consumer.java 파일을 작성해 주십시오.

    public class Consumer {
        public static int consume(String brokers, String topicName) {
            // Create Consumer
            KafkaConsumer<String, String> consumer;
            // Configure
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", brokers);
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.setProperty("group.id", "consumer_group");
            properties.setProperty("auto.offset.reset", "earliest");
    
            consumer = new KafkaConsumer<>(properties);
    
            consumer.subscribe(Arrays.asList(topicName));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(500); // wait for 500ms
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                    System.out.println(record.value());
                }
            }
        }
    }
    

    jar 파일 빌드 및 실행

    1. 작성한 Java code를 git에 저장한 후, VM에서 git clone 명령어를 입력하여 해당 code를 다운로드해 주십시오.
      git clone 자신의 git repository
      
    2. 해당 Java Application을 빌드하려면 Maven을 설치해 주십시오.
      yum install maven -y
      
    3. 다운로드한 Java code가 있는 폴더로 이동한 후, jar 파일을 빌드해 주십시오.
      jar 파일을 빌드하면 target 폴더와 target 폴더 내에 jar 파일이 생성됩니다.
      cd kafkatest
      mvn clean package
      
    4. target 폴더로 이동한 후, jar 파일을 실행해 주십시오.
      cd target
      
      # 데이터 전송하기
      java -jar kafkatest-1.0-SNAPSHOT.jar produce [topic] [broker.list]
      # [topic]에 CMAK에서 만든 Topic을 입력합니다.
      # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
      예시) java -jar kafkatest-1.0-SNAPSHOT.jar produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
      
      
      # 데이터 조회하기
      java -jar kafkatest-1.0-SNAPSHOT.jar consume [topic] [broker.list]
      # [topic]에 CMAK에서 만든 Topic을 입력합니다.
      # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 PlainText 복사본을 입력합니다.
      예시) java -jar kafkatest-1.0-SNAPSHOT.jar consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
      

    Python 활용하여 데이터 전송

    이 가이드에서는 Python 2.7.5 버전을 기준으로 설명합니다.

    Python에서 Kafka를 활용하려면 kafka-python package를 설치해 주십시오.

    # pip 설치
    curl -LO https://bootstrap.pypa.io/pip/2.7/get-pip.py
    python get-pip.py
    
    # kafka-python package 설치
    pip install kafka-python
    

    KafkaMain.py 작성

    KafkaMain.py 파일을 작성해 주십시오

    import sys
    from kafka import KafkaProducer, KafkaConsumer
    from json import dumps
    import time
    
    
    def produce(topicName, brokerLists):
            producer = KafkaProducer(bootstrap_servers=brokerLists,
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
            for i in range(100):
                    producer.send(topicName, i)
    
    def consume(topicName, brokerLists):
            consumer = KafkaConsumer(topicName, bootstrap_servers=brokerLists,
                            group_id="test")
    
            for msg in consumer:
                   print(msg)
    
    
    action=sys.argv[1]
    topicName=sys.argv[2]
    brokerLists=sys.argv[3].split(',')
    
    if action == 'produce':
        produce(topicName, brokerLists)
    elif action == 'consume':
        consume(topicName, brokerLists)
    else:
        print('wrong arguments')
    

    KafkaMain.py 파일 실행하여 produce 수행

    KafkaMain.py 파일을 실행해 주십시오. 0~99의 숫자를 전송하는 예시입니다.

    python KafkaMain.py produce [topic] [broker.list]
    # [topic]에 CMAK에서 생성한 Topic을 입력합니다.
    # [broker.list]에 앞에서 확인했던 Broker 노드 정보의 PlainText 복사본을 입력합니다.
    # 예시) python KafkaMain.py produce test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    

    KafkaMain.py 파일을 실행하여 consume 수행

    KafkaMain.py파일을 실행하여

    python KafkaMain.py consume [topic] [broker.list]
    # [topic]에 CMAK에서 생성한 Topic을 입력합니다.
    # [broker.list]에 앞에서 확인했던 Broker 노드 정보의 PlainText 복사본을 입력합니다.
    # 예시) python KafkaMain.py consume test 192.168.2.24:9092,192.168.2.25:9092,192.168.2.26:9092
    

    통신 구간 암호화

    Apache Kafka 클라이언트와 Apache Kafka의 Broker 노드 간 통신 구간을 암호화하는 방법을 설명합니다.

    전체 프로세스는 다음과 같습니다. 1~3의 과정은 클러스터 생성 시 자동으로 실행됩니다.

    1. 매니저 노드에서 자체 인증서를 생성합니다.
    2. 각 Broker 노드에서 인증서를 생성한 후 인증서 서명 요청을 생성합니다.
    3. 인증서 서명 요청에 대해 매니저 노드가 서명합니다.
    4. 클라이언트에서는 인증서를 다운로드한 후 인증서에 대한 정보를 갖고 있는 TrustStore를 생성합니다.
    5. 암호화 통신에 대한 설정 파일을 작성합니다.
    6. hosts 파일을 수정합니다.

    인증서 다운로드

    인증서를 다운로드하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Big Data & Analytics > Cloud Data Streaming Service > Cluster 메뉴를 차례대로 클릭해 주십시오.
    2. 클러스터 상세 정보에서 인증서 관리 항목의 [다운로드] 버튼을 클릭해 주십시오.
      cdss-2-31_ko
    3. 팝업 창이 나타나면 [확인] 버튼을 클릭해 주십시오.
      • 인증서가 다운로드됩니다.
    4. 다운로드한 인증서 파일을 Producer, Consumer VM으로 복사해 주십시오.
      • Producer, Consumer VM의 /root 경로에 ca-cert라는 이름으로 저장합니다.

    Truststore 생성

    인증서 정보를 저장하는 TrustStore을 생성하는 방법은 다음과 같습니다.

    1. Truststore를 생성하려면 다음 명령어를 입력해 주십시오.
      keytool -keystore kafka.client.truststore.jks -alias mytruststore -import -file ca-cert
      
    2. keystore 비밀번호를 입력해 주십시오.
      cdss-2-36_ko
      • Enter keystore password: Password 입력
      • Re-enter new password: Password 다시 입력
    3. Trust this certificate? [no]: 문구가 나타나면 'yes'를 입력해 주십시오.
    4. ls 명령어를 입력하여 kafka.client.truststore.jks 파일이 생성되었는지 확인해 주십시오.

    암호화 설정 파일 작성

    client-auth.properties 파일을 다음과 같이 작성해 주십시오.
    [password]에는kafka.client.truststore.jks 파일 생성 시 설정한 비밀번호를 입력합니다.

    # /root/kafka_2.12-2.4.0 폴더에 암호화 설정 파일을 생성합니다.
    cd kafka_2.12-2.4.0
    vi client-auth.properties
    
    security.protocol=SSL
    ssl.truststore.location=/root/kafka.client.truststore.jks
    ssl.truststore.password=[password]
    

    cdss-2-37_ko

    hosts 파일 수정

    host 파일은 리눅스에서 DNS 서버보다 먼저 호스트명을 IP로 변환해 주는 파일입니다. 암호화 통신을 하려면 hosts 파일(/etc/hosts)을 수정해야 합니다.

    /etc/hosts 파일을 실행해 주십시오.

    vi /etc/hosts
    

    Broker 노드 정보의 hosts 파일 정보 복사본을 추가해 주십시오.

    127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
    ::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
    192.168.2.24 yeakafka2-d-2am
    192.168.2.25 yeakafka2-d-2an
    192.168.2.26 yeakafka2-d-2ao
    

    cdss-2-38_ko

    통신 구간 암호화하여 데이터 전송

    Producer VM에서 다음과 같은 명령어를 실행해 주십시오.
    전송하고 싶은 메시지를 입력하면 브로커 노드에 해당 메시지가 저장됩니다. 종료하려면 [Ctrl] + [C] 키를 눌러 주십시오.

    ./bin/kafka-console-producer.sh --broker-list [broker.list] --topic [topic] --producer.config client-auth.properties
    # [broker.list]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
    # [topic]에 CMAK에서 생성한 Topic을 입력합니다.
    # 예시) ./bin/kafka-console-producer.sh --broker-list yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --producer.config client-auth.properties
    

    cdss-2-40_ko

    통신 구간 암호화하여 저장된 데이터 조회

    Consumer VM에서 다음과 같은 명령어를 실행해 주십시오.

    cd kafka_2.12-2.4.0
    ./bin/kafka-console-consumer.sh --bootstrap-server [bootstrap.server] --topic [topic] --consumer.config client-auth.properties
    # [bootstrap.server]에 앞에서 확인했던 브로커 노드 정보의 TLS 복사본을 입력합니다.
    # [topic]에 앞 단계 Producer VM에서 입력한 Topic을 입력합니다.
    # 예시) ./bin/kafka-console-consumer.sh --bootstrap-server yeakafka2-d-2am:9093,yeakafka2-d-2an:9093,yeakafka2-d-2ao:9093 --topic test --consumer.config client-auth.properties
    

    cdss-2-41_ko

    Kafka Connect로 데이터 파이프라인 구축

    Kafka Connect로 데이터 파이프라인을 구축하는 방법은 Kafka Connect로 데이터 파이프라인 구축을 참조해 주십시오.


    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.