Kafka を使用する

Prev Next

VPC環境で利用できます。

Apache Kafkaは、高いパフォーマンスと安定性を保証する分散型メッセージングシステムです。Kafkaは大量のデータをリアルタイムで処理する際に使用します。

注意

Kafkaアプリを使用するには ZOOKEEPER-3.4.13タイプの Zookeeperアプリを先に作成してください。

参考

Kafkaアプリの詳しい説明は、Kafka公式ドキュメントをご参照ください。

Kafkaアプリの詳細情報の確認

アプリの作成が完了すると、詳細情報を確認できます。アプリの詳細情報の StatusStableの場合、アプリが正常に起動されたことを意味します。
アプリの詳細情報を確認する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Data Forestメニューを順にクリックします。
  2. 左側の Data Forest > Appsメニューをクリックします。
  3. アプリを所有するアカウントを選択します。
  4. 詳細情報を確認するアプリをクリックします。
  5. アプリの詳細情報を確認します。
    df-kafka_2-1_updated_ko
    • Quick links
      • kafka-manager: Kafka Managerの URL
    • Connecting String
      • zookeeper.connect: Kafkaアプリを作成する際に指定した Zookeeperアンサンブルのアドレス
    • コンポーネント
      • broker: メッセージを生産するプロデューサと消費するコンシューマの間でメッセージを保存、転送。
      • kafka-manager: brokerのモニタリング、トピック(Topic)の管理、パーティションの再割り当てが可能

Kafkaアプリの動作環境構成

起動された Kafkaアプリを使用したり開発するには、まずアプリの動作環境を構成する必要があります。 Devアプリで Kafkaアプリの動作環境を設定する方法を例に説明します。

  1. Kafkaアプリのパッケージと設定を確認するには、/home/forest/get-app-env.sh {appname} {install directory}のように作成してください。

    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ mkdir kafka
    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ /home/forest/get-app-env.sh kafka ./kafka/
    
    [/home/forest/get-app-env.sh] Apptype: KAFKA-2.4.0[/home/forest/get-app-env.sh] Download install-client script for KAFKA-2.4.0
    [/home/forest/get-app-env.sh] Install client on ./kafka/
    current kafka: .yarn/services/kafka/components/v1
    
    --2021-05-12 17:00:14--  http://dist.kr.df.naverncp.com/repos/release/kafka/kafka_2.12-2.4.0.tgz
    Resolving dist.kr.df.naverncp.com (dist.kr.df.naverncp.com)... 10.213.208.69
    Connecting to dist.kr.df.naverncp.com (dist.kr.df.naverncp.com)|10.213.208.69|:80... connected.
    HTTP request sent, awaiting response... 200 OK
    Length: 62283588 (59M) [application/octet-stream]
    Saving to: ‘./kafka//kafka_2.12-2.4.0.tgz’
    
    100%[============================================>] 62,283,588  --.-K/s   in 0.1s
    
    2021-05-12 17:00:14 (459 MB/s) - ‘./kafka//kafka_2.12-2.4.0.tgz’ saved [62283588/62283588]
    
    Kafka Client has been installed on ./kafka//kafka_2.12-2.4.0
    
  2. インストールパスで以下のように producer.propertiesconsumer.propertiesを確認します。

    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cat ./kafka/kafka_2.12-2.4.0/config/producer.properties
    #Generated by Apache Slider
    #Wed May 12 16:48:59 KST 2021
    bootstrap.servers=broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092
    compression.type=none
    
    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cat ./kafka/kafka_2.12-2.4.0/config/consumer.properties
    #Generated by Apache Slider
    #Wed May 12 16:48:59 KST 2021
    bootstrap.servers=broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092
    group.id=test-consumer-group
    
  3. kafka-topic.shを利用して「test」というトピックを作成します。
    kafka-topics.sh --bootstrap-server {bootstrap servers} --create --topic {topic name} --partitions {number of partitions}のように作成します。

    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kafka/kafka_2.12-2.4.0/bin/kafka-topics.sh --bootstrap-server broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092 --create --topic test --partitions 6
    [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kafka/kafka_2.12-2.4.0/bin/kafka-topics.sh --bootstrap-server broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092 --describe --topic test
    Topic: test     PartitionCount: 6       ReplicationFactor: 3    Configs: retention.bytes=50000000000
            Topic: test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
            Topic: test     Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
            Topic: test     Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
            Topic: test     Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
            Topic: test     Partition: 4    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
            Topic: test     Partition: 5    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
    
  4. kafka-console-producer.shkafka-console-consumer.shを使用して produce/consumeする例です。

    [test01@shell-0.dev-1.test01.kr.df.naverncp.com ~][df]$ ./kafka/kafka_2.12-2.4.0/bin/kafka-console-producer.sh --broker-list broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092 --topic test
    >test1
    >test2
    >test3
    >test4                   
    >>^C
    [test01@shell-0.dev-1.test01.kr.df.naverncp.com ~][df]$ ./kafka/kafka_2.12-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server broker-0.kafka.test01.kr.df.naverncp.com\:9092,broker-1.kafka.test01.kr.df.naverncp.com\:9092,broker-2.kafka.test01.kr.df.naverncp.com\:9092 --topic test --from-beginning 
    test2
    test4
    test3
    test1
    

Kafka Managerを使用する

Kafka Managerを利用して brokerモニタリング、トピック管理、パーティション再分配などを行うことができます。

Kafka Managerの使い方を説明します。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Data Forestメニューを順にクリックします。
  2. 左側の Data Forest > Appsメニューをクリックします。
  3. アプリを所有するアカウントを選択してアプリをクリックします。
  4. アプリの詳細情報から Quick linkskafka-manager URLにアクセスします。
  5. ログイン画面が表示されたら、アプリ作成時にユーザー設定段階で入力したユーザー名(KM_USERNAME)とパスワード(KM_PASSWORD)を使用してログインします。
  6. [Add Cluster] ボタンをクリックします。
  7. Kafkaクラスタを追加します。
    • Cluster Name: アプリ名
    • Cluster Zookeeper Hosts: コンソール Kafkaアプリの詳細情報内の zookeeper.connect
  8. クラスタ情報を確認します。
    df-kafka_05_vpc_ko

Kafka broker変更

1つの brokerが少ないデータを維持するように運用することをお勧めします。つまり、1つの Kafkaアプリで複数のトピックを扱うよりも、1つの Kafkaアプリごとに1つのトピックを扱う方が安定しています。また、トピックのデータがどれだけ増えるか予測が難しい場合は、最初からパーティションを細かく分割し、状況に応じて brokerの数を調整する方法で運用することをお勧めします。

broker数を変更する方法は、次の通りです。

  1. NAVERクラウドプラットフォームコンソールの VPC環境で、i_menu > Services > Big Data & Analytics > Data Forest > Data Forest > Appsメニューを順にクリックします。
  2. アカウントを選択してアプリを選択し、 [Flex] ボタンをクリックします。
  3. Flexの変更画面が表示されたら、broker数を修正して [変更] ボタンをクリックします。

df-kafka_brokerC_vpc_updated_ko

参考
  • broker数を増やした後にトピックを追加すると、増加した brokerを含むパーティションが分配されます。ただし、既存のトピックのパーティションを自動的に再割り当てしないので、既存のトピックはユーザーが直接 kafka-managerでパーティションを再割り当てする必要があります。
  • broker数を減らす場合、COMPONENT_IDが大きいものから停止します。brokerが10つの場合、broker-9、broker-8、broker-7…の順に除外されます。
注意

除外される brokerのパーティションをあらかじめ他の brokerに再割り当てすることで、障害発生やデータ損失を防ぐことができます。

Kafkaアプリ使用時の注意事項

ローカルディスクの容量と運用方式

データは Data Forestクラスタの各ノードのローカルディスクに保存されます。ただし、ノードごとのディスク容量は約300GBに制限されています。
可能であれば、1つの brokerが少ないデータを維持するように運用することをお勧めします。つまり、1つの Kafkaアプリで複数のトピックを扱うよりは、1つの Kafkaアプリごとに1つのトピックを扱う方が安定しています。また、どのトピックのデータがどれだけ増えるか予測が難しい場合は、最初からパーティションを細かく分割し、状況に応じて brokerの数を調整する方法で運用することをお勧めします。

注意

大容量が必要な場合は、longlived_localdiskキュー(サービス用)を使用するか、大容量のノードで構成された専用キューを用意して Kafkaアプリを実行してください。

ノード障害

Kafkaアプリは、1つの物理的なノードに1つの brokerのみ起動します。したがって、replication factorが3(デフォルト値)の場合、1台または2台のノードが故障してもデータ損失が発生しません。

データ保存

Kafkaアプリのデータは Data Forestのローカルファイルシステムに保存され、ユーザーがアプリを停止(stop)したり、他の問題でアプリが終了してもデータは削除されません。停止したアプリをユーザーが削除(destroy)する場合のみデータが削除されます。

停止したアプリを再起動(start)すると、既存のノードで起動されるため、既存のデータが復旧されます。しかし、当該ノードのリソースを他のタスクが占有していたり、ノードが障害で除外される場合があります。このような場合、1時間が経過すると、アプリが既存のノードで起動することを放棄して他のノードで起動されます。したがって、その間に起動していない brokerのデータは損失する可能性があります。

Zookeeperノードの整理

アプリを削除(destroy)する場合、Kafkaアプリが使用していた Zookeeperパス zookeeper.connectは削除することをお勧めします。もし削除せずに同じ名前のアプリを再作成(create)する場合、問題が発生する可能性があります。