Kafka Connect로 데이터 파이프라인 구축
    • PDF

    Kafka Connect로 데이터 파이프라인 구축

    • PDF

    Article Summary

    VPC 환경에서 이용 가능합니다.

    이 가이드는 서버에 Kafka Connect를 설치한 후 Cloud Data Streaming Service를 활용하여 MySQL의 변경 사항을 Elasticsearch에 적용하는 방법을 설명합니다.

    참고

    이 가이드는 사용자의 이해를 돕기 위해 작성된 예제입니다. 이 가이드에서 사용된 connector 및 confluent-hub에 대한 라이선스는 confluent 사에 있으며 confluent 사의 라이선스 정책을 따릅니다. 네이버클라우드는 해당 connector 및 confluent-hub를 직접 제공하는 사업자가 아니며, 사용자는 connector 및 confluent-hub 사용 여부를 직접 결정할 수 있습니다.

    사전 작업

    이 가이드를 수행하기 전에 이용 신청을 완료해야 하는 작업은 다음과 같습니다.

    • 서버 및 VPC 생성
    • Cloud DB for MySQL 서버 생성
    • Cloud Data Streaming Service 클러스터 생성
    • Search Engine Service 클러스터 생성

    네트워크 설정

    Step 1. Cloud DB for MySQL 설정

    Cloud DB for MySQL의 DB 계정을 추가하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Database > Cloud DB for MySQL > DB Server 메뉴를 차례대로 클릭해 주십시오.
    2. DB Server를 선택한 후 DB 관리 > DB User 관리를 클릭해 주십시오.
    3. 필요한 정보를 입력한 후 [DB User 추가] 버튼을 클릭해 주십시오.
    4. DB 계정이 추가되면 [저장] 버튼을 클릭해 주십시오
      cdss-5-2_ko
    참고

    Cloud DB for MySQL에 대한 자세한 설명은 Cloud DB for MySQL 사용 가이드를 참조해 주십시오.

    Step 2. ACG 설정

    Cloud Data Streaming Service 브로커 노드의 9092번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
    2. ACG 목록에서 'cdss-b-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
    3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
      cdss-5-4_ko
      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9092
    4. [적용] 버튼을 클릭해 주십시오.

    Search Engine Service 매니저 노드의 9200번 포트로 접근할 수 있도록 ACG를 설정하는 방법은 다음과 같습니다.

    1. 네이버 클라우드 플랫폼 콘솔에서 Services > Compute > Server > ACG 메뉴를 차례대로 클릭해 주십시오.
    2. ACG 목록에서 'searchengine-m-xxxxx'을 선택한 후 [ACG 설정] 버튼을 클릭해 주십시오.
    3. ACG 규칙을 입력한 후 [추가] 버튼을 클릭해 주십시오.
      cdss-5-6_ko
      • 프로토콜: TCP
      • 접근 소스: Kafka Connect가 실행될 서버의 IP
      • 허용 포트: 9200

    서버에 Kafka Connect 설치

    Step 1. Java 설치

    Java를 설치하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 yum 업데이트를 진행해 주십시오.

      yum update
      
    2. 다음 명령어를 입력하여 java-1.8.0-openjdk-devel.x86_64를 설치해 주십시오.

      yum install java-1.8.0-openjdk-devel.x86_64
      
    3. 다음 명령어를 입력하여 정상적으로 설치가 되었는지 확인해 주십시오.

      java -version
      javac -version
      

    Step 2. Kafka Connect 설치

    Kafka Connect를 설치하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 서버의 /root 경로에 Kafka Connect를 다운로드해 주십시오.

      curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
      
    2. 다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-community-7.0.1.tar.gz
      
    3. 다음 명령어를 입력하여 구동 전 properties 파일을 수정해 주십시오.

      • Cloud Data Streaming Service의 브로커 노드 정보를 참고하여 properties 파일의 ‘bootstrap.servers’에 ip 목록을 추가합니다.
      vi /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
      bootstrap.servers=10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092
      

    Step 3. Confluent Hub 설치

    Confluent Hub는 Kafka Connect에서 사용되는 다양한 플러그인을 간편하게 다운로드할 수 있는 저장소입니다. 지원되는 플러그인의 전체 목록은 Confluent에서 제공하는 Confluent Connector Portfolio를 확인해 주십시오.

    1. 다음 명령어를 입력하여 /root 경로에 새로운 폴더를 만든 후 해당 폴더로 이동해 주십시오.

      • 예제에서는 'confluent-hub'라는 이름의 폴더를 생성합니다.
      mkdir confluent-hub
      cd confluent-hub
      
    2. 다음 명령어를 입력하여 현재 경로(/root/confluent-hub)에 Confluent Hub를 다운로드해 주십시오.

      curl -O http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
      
    3. 다음 명령어를 입력하여 다운로드한 파일의 압축을 해제해 주십시오.

      tar -zxvf confluent-hub-client-latest.tar.gz
      
    4. 다음 명령어를 입력하여 현재 경로(/root/confluent-hub)에 추후 플러그인이 저장될 폴더를 생성해 주십시오.

      • 예제에서는 'plugins'라는 이름의 폴더를 생성합니다.
      mkdir plugins
      
    5. 다음 명령어를 차례대로 입력하여 PATH 환경변수에 압축 해제한 bin 폴더의 경로를 추가해 주십시오.

      vi ~/.bashrc
      
      export CONFLUENT_HOME='~/confluent-hub'
      export PATH=$PATH:$CONFLUENT_HOME/bin
      
      source ~/.bashrc
      

    Step 4. MySQL Connector 설치

    다음 명령어를 입력하여 debezium-connector-mysql을 설치해 주십시오.

    • --component-dir은 실제 플러그인이 설치되는 폴더 경로입니다. STEP 3에서 생성한 /root/confluent-hub/plugins로 설정해 주십시오.
    • --worker-configs는 플러그인 설치 후 적용된 properties 파일의 경로입니다. Step 2에서 수정했던 /root/confluent-7.0.1/etc/kafka/connect-distributed.properties로 설정해 주십시오.
    confluent-hub install debezium/debezium-connector-mysql:1.7.0 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 5. Elasticsearch Connector 설치

    다음 명령어를 입력하여 kafka-connect-elasticsearch을 설치해 주십시오.

    • --component-dir--worker-configs는 STEP 4와 동일하게 적용해 주십시오.
    confluent-hub install confluentinc/kafka-connect-elasticsearch:11.1.3 --component-dir /root/confluent-hub/plugins --worker-configs /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
    

    Step 6. Kafka Connect 프로세스 실행

    Kafka Connect 프로세스를 실행하는 방법은 다음과 같습니다.

    1. 다음 명령어를 입력하여 Kafka Connect 프로세스를 백그라운드로 실행해 주십시오.

      /root/confluent-7.0.1/bin/connect-distributed -daemon /root/confluent-7.0.1/etc/kafka/connect-distributed.properties
      
    2. 다음 명령어를 입력하여 프로세스가 정상적으로 작동하는지 확인해 주십시오.

      curl localhost:8083
      
      {"version":"7.0.1-ccs","commit":"b7e52413e7cb3e8b","kafka_cluster_id":"m1hLK0L6Qra5TLVy7b_A4A"}
      
    3. 다음 명령어를 입력하여 앞에서 설치한 Connector가 모두 정상적으로 노출되는지 확인해 주십시오.

      curl localhost:8083/connector-plugins
      
      [
         {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","type":"sink","version":"11.1.3"}, 
         {"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.7.0.Final"},
         {"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
         {"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}
      ]
      

    Kafka Connect 사용

    Step 1. MySQL 테이블 생성 및 데이터 추가

    MySQL 테이블을 생성하고 데이터를 추가하는 방법은 다음과 같습니다.

    1. 사용 중인 Cloud DB for MySQL 서버에 접속해 주십시오.
    2. 다음 명령어를 입력하여 테이블을 생성한 후 데이터를 추가해 주십시오.
      • 예제에서는 member라는 이름의 테이블을 생성합니다.
      CREATE TABLE IF NOT EXISTS member (
       id int NOT NULL PRIMARY KEY,
       name varchar(100),
       email varchar(200),
       department varchar(200)
      );
      
      INSERT INTO member(id, name, email, department) values (1, 'messi', 'messi@gmail.com', 'A');
      INSERT INTO member(id, name, email, department) values (2, 'ronaldo', 'ronaldo@naver.com', 'B');
      INSERT INTO member(id, name, email, department) values (3, 'son', 'son@ncloud.com', 'B');
      INSERT INTO member(id, name, email, department) values (4, 'park', 'park@yahoo.com', 'B');
      

    Step 2. MySQL Connector 등록

    MySQL Connector를 등록하는 방법은 다음과 같습니다.

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.
    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
      {
         "name": "mysql-connector", // connector의 이름
         "config": {
           "connector.class": "io.debezium.connector.mysql.MySqlConnector", // 사용할 플러그인의 종류
           "database.hostname": "db-9c242.vpc-cdb.ntruss.com", // MySQL 서버의 엔드포인트
           "database.port": "3306", // MySQL 서버의 포트
           "database.user": "kimdong", // MySQL 서버의 user
           "database.password": "1234", // MySQL 서버 user의 password
           "database.server.id": "184054", // kafka connect에서 사용되는 MySQL 서버의 uuid
           "database.server.name": "NCP_MYSQL", // kafka connect에서 사용할 MySQL 서버에 부여하는 이름, 추후 kafka topic의 접두사로 사용됨 
           "database.whitelist": "test", // kafka connect에서 접근할 MySQL 서버의 데이터베이스 지정
           "database.history.kafka.bootstrap.servers": "10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092", // Broker 노드 정보
           "database.history.kafka.topic": "this_is_topic", // MySQL 히스토리 변경 내역을 저장할 topic 이름 
           "snapshot.locking.mode": "none",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter"
         }
      }
      
    3. 다음 명령어를 입력하여 MySQL Connector를 등록해 주십시오.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"mysql-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"db-9c242.vpc-cdb.ntruss.com","database.port":"3306","database.user":"kimdong","database.password":"1234","database.server.id":"184054","database.server.name":"NCP_MYSQL","database.whitelist":"test","database.history.kafka.bootstrap.servers":"10.0.200.14:9092,10.0.200.15:9092,10.0.200.16:9092","database.history.kafka.topic":"this_is_topic","snapshot.locking.mode":"none","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter"}}' 
      
      
      

    Step 3. Topic 확인

    Topic이 생성되었는지 확인하는 방법은 다음과 같습니다.

    1. Cloud Data Streaming Service에서 제공하는 CMAK에 접속해 주십시오.
    2. STEP 2에서 설정한 body의 정보대로 Topic이 생성된 것을 확인해 주십시오.
      • STEP 1에서 생성한 MySQL의 member 테이블의 변경 정보가 담길 Topic은 'NCP_MYSQL.test.member'입니다.
        cdss-5-7_ko

    Step 4. Elasticsearch Connector 등록

    Elasticsearch Connector를 등록하는 방법은 다음과 같습니다.

    1. Kafka Connect가 설치된 서버로 접속해 주십시오.
    2. 요청 시 보낼 JSON body를 개인 환경에 맞게 아래 형식대로 입력해 주십시오.
      {
         "name": "es-connector", // connector의 이름
         "config": {
           "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", // 사용할 플러그인의 종류
           "connection.url": "http://10.0.100.9:9200", // topic을 가져올 kafka 브로커 노드
           "tasks.max": "1",
           "topics": "NCP_MYSQL.test.member", // 컨슈밍할 topic 이름
           "type.name": "_doc",
           "value.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter": "org.apache.kafka.connect.json.JsonConverter",
           "key.converter.schemas.enable": "true",
           "value.converter.schemas.enable": "true",
           "transforms": "extractKey",
           "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
           "transforms.extractKey.field": "id", // MySQL 테이블에서 사용하는 pk명
           "behavior.on.null.values": "IGNORE"
         }
      }
      
    3. 아래 명령어를 입력하여 Elasticsearch Connector를 등록해 주십시오.
      curl -X POST localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{"name":"es-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","connection.url":"http://10.0.100.9:9200","tasks.max":"1","topics":"NCP_MYSQL.test.member","type.name":"_doc","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","transforms":"extractKey","transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractKey.field":"id","behavior.on.null.values":"IGNORE"}}'
      

    Step 5. Elasticsearch 데이터 확인

    Elasticsearch 데이터를 확인하는 방법은 다음과 같습니다.

    1. Search Engine Service에서 제공하는 Kibana의 엔드포인트로 접속해 주십시오.
    2. 인덱스 목록을 조회하여 ncp_mysql.test.member라는 이름의 인덱스가 생성된 것을 확인해 주십시오.
      cdss-5-8_ko
    3. ncp_mysql.test.member 인덱스의 내용을 조회해 주십시오.
      cdss-5-9_ko
    4. ncp_mysql.test.member 인덱스의 특정 도큐먼트를 조회해 주십시오.
      • MySQL에서 추가한 데이터는 _source.after에서 확인할 수 있습니다.
        cdss-5-10_ko

    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.