Apache Flume の連携

Prev Next

VPC環境で利用できます。

Apache Flumeは、分散環境で大量のログデータを効果的に収集してデータ保存場所に転送できるサービスです。
詳細は、Flume公式ホームページをご参照ください。

hadoop-chadoop-use-ex8_0-1

  • 性質

    • Distributed : トポロジー(Topology)をどのように構成するかによりますが、複数の Flume Agent同士でパイプを作ることができます。通常、Agent同士に接続する際には Avroタイプの Sink、Source(next hop)を使用します。
    • Reliable : Flume Agent内でイベントは Source、Channel、Sinkというコンポーネントに従って移動します。イベントが Sinkに転送されるまでは Channelでイベントが消えず、これを保証するために Flumeは Transactional approachを導入しました。
    • Available : Channelで Disk-backedシステムを使用する場合、Agentにエラーが発生しても Sourceから Channelに転送されたデータを復旧できます。
  • 用途

    • rom many ~ to a centralized : Flume Agentは、複数のノードからログを読み込み、最終的には中央化された保存場所に保管できます。
    • collecting、aggregating、moving : ログを収集して結合できます。その過程で Selector、Interceptorを活用してイベントの形態を変更できます。
      収集したイベントは次の Agentに転送したり、最終 Sinkに保存することができます。
  • コンポーネント

    • Event : Flume Agentによって移行されるデータの基本単位です。必要に応じてイベントにはヘッダ値を与えることができ、通常、ヘッダはイベント内容を確認して変更するために使用します。
    • Flume Agent : JVMプロセスで Source、Channel、Sinkコンポーネントをホストします。イベントは Agentを通じて外部の Sourceから next-hopの宛先に移ることができます。
      • Source : クライアントから転送されたイベントを消費します。Sourceがイベントを受けると1つ以上のチャンネルに渡します。
      • Channel : イベントの一時保存場所です。Sourceと Sinkをつなぎ、イベントの流れの Durabilityを保証する重要な役割を担います。
      • Sink : チャンネルからイベントを削除して flowの next-hopに転送します。

このガイドでは、Cloud Hadoopの HDFSにサーバのログを保存する Flumeトポロジーの構成方法を説明します。

Flume Agentを使用する

Flume Agentを用いて以下のように各サーバの vmstat結果を収集し、Cloud Hadoop HDFSに保存する Flumeトポロジーを構成できます。

hadoop-chadoop-use-ex8_1-1

Flumeのインストール

Flume Agentをインストールする方法は、次の通りです。

  1. ログを収集するサーバ3台を作成します。(Server作成ガイドを参照)

    • 各サーバは Cloud Hadoopが含まれた ACGに作成される必要があります。
      • log-gen-001 / centos-7.8–64 / 2vCPU, 8G Mem
      • log-gen-002 / centos-7.8–64 / 2vCPU, 8G Mem
      • log-gen-003 / centos-7.8–64 / 2vCPU, 8G Mem
  2. ~/downloads ~/appsパスのディレクトリを作成します。このパスで Flumeパッケージをダウンロードし、圧縮を展開してインストールを完了します。

    mkdir ~/downloads ~/apps
    
    cd ~/downloads
    wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
    tar -xvf apache-flume-1.9.0-bin.tar.gz
    
    mv apache-flume-1.9.0-bin ~/apps/
    cd ~/apps
    ln -s apache-flume-1.9.0-bin flume
    

事前タスク

Cloud Hadoopを HDFS Sinkとして利用するには、以下のような事前タスクが必要です。

1. ウェブサーバと HDFSとの通信のための事前タスク

各ログサーバが HDFSネームノードホストと通信するには、各ホストの Private IPアドレスとホスト名を /etc/hostsに登録する必要があります。

当該情報は Cloud Hadoopのエッジノード(e.g.e-001-xxx)の /etc/hostsで確認できます。

2. HDFS Sinkを使用するための事前タスク

HDFS Sinkを使用するために、このトポロジーでは HDFS Sinkを使用します。Flume Agentが実行されるノードでは、Hadoop common jarライブラリが必要となります。また、NameNode HAのためにネームサービスを使用するには hdfs-site.xml , core-site.xmlのような構成ファイルも必要です。

  • Hadoopバイナリのダウンロード
    以下のコマンドを用いて /homeの下位から必要な Hadoopバイナリと.jarライブラリをダウンロードします。
  1. Cloud Hadoop 1.3バージョンでは、以下のコードを使用します。

    # Hadoopバイナリのダウンロード
    wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz -P ~/apps
    tar xfz hadoop-2.6.5.tar.gz  
    
    # HDFS Jarのダウンロード
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.5/hadoop-hdfs-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/2.6.5/hadoop-hdfs-client-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
    
  2. Cloud Hadoop 1.4バージョン以上では、以下のコードを使用します。

    # Hadoopバイナリのダウンロード
    wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.4/hadoop-3.1.4.tar.gz -P ~/apps
    tar xfz hadoop-3.1.4.tar.gz  
    
    # HDFS Jarのダウンロード
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.1.4/hadoop-auth-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/3.1.4/hadoop-hdfs-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.1.4/hadoop-hdfs-client-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
    
  • Hadoop configの設定
    $FLUME_CLASS_PATH/conf の下の Hadoop configファイルをダウンロードします。
$ cd ~/apps/flume/conf
$ curl -u $AMBARI_ID:$AMBARI_PASS -G 'http://$AMBARI_URI:8080/api/v1/clusters/$CLUSTER_NAME/components?format=client_config_tar' -o client_config.tgz 
$ tar xfz client_config.tgz
$ rm -f client_config.tgz
  • Hadoop環境変数の設定
    以下のコマンドを実行して Hadoop環境変数を設定します。
export HADOOP_HOME=~/apps/hadoop-3.1.4
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=~/apps/flume/conf/HDFS_CLIENT/
export PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:${PATH}

Flume構成の変更

Flume構成を変更する方法は、次の通りです。

  1. Flume Agentで以下のコマンドを実行し、構成値を作成します。

    cd ~/apps/flume/conf
    cp flume-conf.properties.template flume.conf
    cp flume-env.sh.template flume-env.sh
    cp ~/apps/hadoop-3.1.4/share/hadoop/common/*.jar ~/apps/flume/lib/ 
    cp ~/apps/hadoop-3.1.4/share/hadoop/common/lib/woodstox-core-5.0.3.jar ~/apps/flume/lib/
    mv ~/apps/flume/lib/guava-11.0.2.jar ~/apps/flume/lib/guava-11.0.2.jar.bak 
    
  2. 各 Flume Agentで以下のように hadoop-env.shJAVA_HOMEHADOOP_HOME オプションの値を修正します。

    • Javaの設定はインストール方法に応じてオプションの値が異なる場合があります。以下は、yumで Javaパッケージのインストール後に設定したパスです。
# Javaのインストール
yum install -y java-1.8.0-openjdk

# hadoop-evn.shの編集
vi $HADOOP_HOME/etc/hadoop/hadoop-env.sh 
# The java implementation to use.  Required.
export JAVA_HOME=/usr/lib/jvm/jre-openjdk
# Hadoop home directory
export HADOOP_HOME=/root/apps/hadoop-3.1.4
  • flume.conf
    • Agentの名前と各コンポーネントを定義します。(Agentの名前: fooAgent)
    • HDFSの Sinkパスにはネームサービスが入ったパスを使用します。 Cloud Hadoopではクラスタ名がネームサービスとなります。
    • hdfs-site.xmlにノード情報が含まれているため、どのネームノードが Active状態であるかを明示する必要はありません。
fooAgent.sources = Exec
fooAgent.channels = MemChannel
fooAgent.sinks = HDFS

fooAgent.sources.Exec.type = exec
fooAgent.sources.Exec.command = /usr/bin/vmstat 1
fooAgent.sources.Exec.channels = MemChannel 

fooAgent.channels.MemChannel.type = memory
fooAgent.channels.MemChannel.capacity = 10000
fooAgent.channels.MemChannel.transactionCapacity = 1000

fooAgent.sinks.HDFS.channel = MemChannel
fooAgent.sinks.HDFS.type = hdfs
fooAgent.sinks.HDFS.hdfs.path = hdfs://$CLUSTER_NAME/user/hduser/flume/events/
fooAgent.sinks.HDFS.hdfs.fileType = DataStream
fooAgent.sinks.HDFS.hdfs.writeFormat = Text
fooAgent.sinks.HDFS.hdfs.batchSize = 1000
fooAgent.sinks.HDFS.hdfs.rollSize = 0
fooAgent.sinks.HDFS.hdfs.rollCount = 10000
  • flume-env.sh
    事前タスクでインストールした hadoop clientのパスを FLUME_CLASSPATHに追加します。
export JAVA_HOME="/usr/lib/jvm/jre-openjdk"
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
export HADOOP_CONF_DIR="/root/apps/flume/conf/HDFS_CLIENT"
FLUME_CLASSPATH="/root/apps/flume/lib"

プロセスの開始

  1. ディレクトリの作成と所有者権限の設定

    $ sudo su - hdfs
    $ hdfs dfs -mkdir /user/hduser/flume/events/
    $ hdfs dfs -chown -R sshuser: /user/hduser/flume/events/
    $ exit
    
  2. 以下のコマンドを使用して各 Flume Agentを開始します。

    cd ~/apps/flume/
    
    ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n  fooAgent
    
    ....
    
    20/09/09 12:32:22 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    20/09/09 12:32:22 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp
    20/09/09 12:32:52 INFO hdfs.HDFSEventSink: Writer callback called.
    20/09/09 12:32:52 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp
    20/09/09 12:32:52 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911
    20/09/09 12:32:55 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    20/09/09 12:32:55 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp
    20/09/09 12:33:25 INFO hdfs.HDFSEventSink: Writer callback called.
    20/09/09 12:33:25 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp
    20/09/09 12:33:25 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913
    20/09/09 12:33:28 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    20/09/09 12:33:28 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp
    20/09/09 12:33:58 INFO hdfs.HDFSEventSink: Writer callback called.
    20/09/09 12:33:58 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp
    20/09/09 12:33:58 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915
    20/09/09 12:34:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
    20/09/09 12:34:01 INFO hdfs.BucketWriter: Creating
    
  3. 以下のコマンドを用いると、HDFSで確認できます。

    $ hadoop fs -ls /user/hduser/flume/events/
    
    Found 17 items
    
    -rw-r--r--   2 root hdfs       3089 2020-09-09 12:25 /user/hduser/flume/events/FlumeData.1599621914876
    -rw-r--r--   2 root hdfs       3093 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621946882
    -rw-r--r--   2 root hdfs       2931 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621979885
    -rw-r--r--   2 root hdfs       3091 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622012888
    -rw-r--r--   2 root hdfs       2931 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622045890
    -rw-r--r--   2 root hdfs       3091 2020-09-09 12:28 /user/hduser/flume/events/FlumeData.1599622078893
    -rw-r--r--   2 root hdfs       2930 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622111895
    -rw-r--r--   2 root hdfs       3093 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622144897
    -rw-r--r--   2 root hdfs       3092 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622177899
    -rw-r--r--   2 root hdfs       2931 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622210902
    -rw-r--r--   2 root hdfs       3093 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622243904
    -rw-r--r--   2 root hdfs       2932 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622276906
    
参考

実際の運用環境では、2つ以上の Flume Agentをパイプ(pipe)しながらインターセプター(interceptor)を活用してイベントを変換します。
Source、Channel、Sinkには様々なタイプが存在し、Kafkaを Channel、Sinkとしてよく使用します。