Kafka Connectに Cloud Data Streaming Clusterをローテーション

Prev Next

VPC環境で利用できます。

別途の VPC Serverを作成して Confluent Replicatorを利用し、Kafka Cluster間のデータをリアルタイムでレプリケーションして Kafka Clusterをローテーションする方法について説明します。
このプロセスは Cent Linuxで作成された Clusterを Rocky Linuxが適用された Clusterに変更することをサポートします。

事前タスク

このガイドを実行する前に確認する必要があるタスクは、次の通りです。

  • 既存の Cloud Data Streaming Service Clusterの BrokerNodeアクセス情報
  • 既存と同じ Kafkaバージョンを持つ新規 Cloud Data Streaming Service Clusterの BrokerNodeアクセス情報
  • ドキュメントの内容のうち、既存 Clusterは A、新規で作成した Clusterは Bと呼びます。

Cluster移行手順

新規で作成した Clusterに移行する手順は、次の通りです。

  1. 下部のガイドに従って VPC Server作成、Confluent Replicatorインストール/適用、ACG設定を完了します。
  2. 新規で作成した Clusterに既存 Clusterの Topicとデータが正常にレプリケーションされるか確認/検証します。
  3. Consumerを新規で作成した Clusterに BootstrapUrlを変更します。
  4. Producerを新規で作成した Clusterに BootstrapUrlを変更します。
  5. 既存 Clusterの Kafka Topicに Queueが完全に処理されているか確認/検証します。
  6. Confluent Replicatorがインストールされた VPC Serverと既存クラスタは停止後に返却します。

VPC Server作成

STEP 1.サーバ作成

Confluent Replicatorをインストールするサーバを作成します。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Compute > Serverメニューを順にクリックします。
  2. [サーバ作成] ボタンをクリックしてサーバを作成します(サーバ作成ガイド)。
  3. サーバ作成時に Subnetの種類は Clusterの BrokerNodeと同じか同じタイプ(Private)の Subnetで作成します。
  4. 作成されたサーバの SSHでアクセスし、Confluent Replicatorインストールを準備します(サーバアクセスガイド)。
    • private subnetに作成された VPC Serverの場合 NATG/Wが追加必要で、ルーティング設定をする必要があります。

ネットワーク ACG設定

STEP 1.ACG設定

新規で作成された VPC Serverは Cloud Data Streaming Service Clusterの BrokerNodeの 9092Portにアクセスできる必要があります。
このために ACGを追加する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Cloud Data Streaming Serviceメニューを順にクリックします。
  2. Clusterの選択後、ブローカーノード ACGの横にある新しいウィンドウを開くボタンをクリックします。
  3. ブローカーノード ACGの名前や ACGIDが同じである ACGを選択した後、上部の [ACG設定] ボタンをクリックします。
  4. inboudタブで以下の情報を入力した後、 [追加] ボタンをクリックします。
    • プロトコル: TCP
    • アクセスソース: VPC Serverの IPアドレスや VPC Serverが属している ACGの名前
    • 許可ポート: 9092
  5. 下部の [適用] ボタンをクリックします。
  6. 同じ方法で現在利用中のクラスタと変更するクラスタのいずれも同じ ACGを登録します。

Confluent Replicatorのインストール

Serverに Confluent Replicatorをインストールするユースケースを説明します。インストールプロセスで Kafka Broker Nodeに対するアクセス情報を確認する必要があります。Confluent Replicatorは、ライセンスに従って利用可否を確認する必要があり、NAVERクラウドプラットフォームでは利用方法のみをガイドします。インストールプロセスで Kafka Broker Nodeに対するアクセス情報を確認する必要があります。

STEP 1.Javaのインストール

  1. 以下のコマンドを入力して jdkをインストールします
    最小1.8または java11以上をお勧めします。
    sudo yum install java-devel -y
    

STEP 2.Confluent Replicatorのインストール

Confluent Replicatorは Confluent Platformの一部として提供します。作成した VPC Serverに Confluent Platformをインストールし、Replicatorコネクタをダウンロードします。

Confluent Platformのダウンロード
Confluent Platformは Confluentダウンロードページでダウンロードできます。

curl -O https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz

ダウンロードしたファイルを解凍した後、ディレクトリに移動します。

tar -xzf confluent-7.5.0.tar.gz
cd confluent-7.5.0

Confluent Platformインストールディレクトリパスを環境変数として設定します(~/.bashrcまたは ~/.bash_profile変更)。

export CONFLUENT_HOME=/path/to/confluent-7.5.0
export PATH=$CONFLUENT_HOME/bin:$PATH

変更事項を適用します

source ~/.bashrc
または
source ~/.bash_profile

Confluent Hubを通じた Replicatorのインストール
Replicatorの最新バージョンコネクタをインストールします。

confluent-hub install confluentinc/kafka-connect-replicator:latest

STEP 3.Kafkaクラスタ構成の確認と設定

VPC Serverにインストールされた Replicatorが2つのクラスタのブローカーにアクセスできるか確認します。Replicatorから Kafkaブローカーが実行中のポートにアクセスでき、ACG設定へのアクセスが許可されている必要があります。

  • Kafkaトピック確認コマンドの例
    kafka-topics --bootstrap-server A_CLUSTER_BROKER_1_IP:9092,A_CLUSTER_BROKER_2_IP:9092 --list
    kafka-topics --bootstrap-server B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092 --list
    

Topicを指定して移動させる場合、B Clusterは A Clusterと同じ Topicを持つ必要があります。以下のユースケースは、A Clusterに「example-topic」という名前を持つ Topicがある場合のユースケースです。

  • 自動で作成して全体を移動させたい場合、以下の STEP 4.Replicator設定ファイル作成をご参照ください。
    kafka-topics --bootstrap-server B_CLUSTER_BROKER_1_IP:9092 --create --topic example-topic --partitions 3 --replication-factor 2
    

STEP 4.Replicator設定ファイル作成

中間サーバで Kafka Replicator設定ファイル(replicator.properties)を作成します。このファイルは2つのクラスタ間のデータのレプリケーションに必要な設定を含みます。/path/to/confluent-7.5.0/etc/kafka-connect-replicator/quickstart-replicator.propertiesファイルを以下の内容に変更します。

# コネクタの名前設定
name=replicator-connector

connector.class=io.confluent.connect.replicator.ReplicatorSourceConnector

key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.replicator.util.ByteArrayConverter
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter

tasks.max=4

# ソースクラスタの接続情報
src.kafka.bootstrap.servers=A_CLUSTER_BROKER_1_IP:9092,A_CLUSTER_BROKER_2_IP:9092

# 対象クラスタの接続情報
dest.kafka.bootstrap.servers=B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092

# 対象クラスタにトピックが存在しない場合、自動作成の有無
topic.auto.create=true

# レプリケーションするトピック名(複数の場合 topic1|topic2|topic3可能)
topic.whitelist=example-topic

# レプリケーションせず除外するトピック名
#topic.blacklist=

# レプリケーションされるトピックのトピック名の再定義が必要な場合(既存トピック名.replicaに作成される)
topic.rename.format=${topic}.replica

# トピックのレプリケーションに失敗する場合の再試行間隔設定(10秒)
topic.create.backoff.ms=10000


# オフセット管理の設定
offset.storage.topic=replicator-offsets
config.storage.topic=replicator-configs
status.storage.topic=replicator-status

STEP 5.connectorと Replicator設定ファイル作成

設定ファイルを使用して Kafka Connectを standaloneモードで実行するために connect-standalone.propertiesファイルを変更します。

bootstrap.servers=B_CLUSTER_BROKER_1_IP:9092,B_CLUSTER_BROKER_2_IP:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter


key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

plugin.path=/usr/share/java,/root/confluent-7.5.0/share/confluent-hub-components

STEP 6.Replicator実行

変更された設定ファイルを適用した kafka Connectを用いて Replicatorを standaloneモードで実行します。
実行すると即時レプリケーションが始まります。

connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/etc/kafka-connect-replicator/quickstart-replicator.properties

ログファイルは、$CONFLUENT_HOME/logsディレクトリに保存されます。

STEP 7.トラブルシューティングと追加設定

  • アクセスエラー: VPC Serverと Clusterのブローカーノード ACGに 9092 Portが許可されているか確認します。
  • データ遅延の問題: レプリケーション速度が遅い場合、consumer.threads値を増やすかブローカーの性能を確認します。
  • 複数トピックのレプリケーション: topic.whitelist設定に正規表現式を使用して複数のトピックをレプリケーションしたり、特定のトピックを除外するには topic.blacklistオプションを使用できます
topic.whitelist=topic1|topic2|topic3
topic.blacklist=topic4|topic5|topic6