Apache Flume の連携
    • PDF

    Apache Flume の連携

    • PDF

    Article Summary

    VPC環境で利用できます。

    Apache Flumeは、分散環境で大量のログデータを効果的に収集してデータ保存場所に転送できるサービスです。
    詳細は、Flume公式ホームページをご参照ください。

    hadoop-chadoop-use-ex8_0-1

    • 性質

      • Distributed : トポロジー(Topology)をどのように構成するかによりますが、複数の Flume Agent同士でパイプを作ることができます。通常、Agent同士に接続する際には Avroタイプの Sink、Source(next hop)を使用します。
      • Reliable : Flume Agent内でイベントは Source、Channel、Sinkというコンポーネントに従って移動します。イベントが Sinkに転送されるまでは Channelでイベントが消えず、これを保証するために Flumeは Transactional approachを導入しました。
      • Available : Channelで Disk-backedシステムを使用する場合、Agentにエラーが発生しても Sourceから Channelに転送されたデータを復旧できます。
    • 用途

      • rom many ~ to a centralized : Flume Agentは、複数のノードからログを読み込み、最終的には中央化された保存場所に保管できます。
      • collecting、aggregating、moving : ログを収集して結合できます。その過程で Selector、Interceptorを活用してイベントの形態を変更できます。
        収集したイベントは次の Agentに転送したり、最終 Sinkに保存することができます。
    • コンポーネント

      • Event : Flume Agentによって移行されるデータの基本単位です。必要に応じてイベントにはヘッダ値を与えることができ、通常、ヘッダはイベント内容を確認して変更するために使用します。
      • Flume Agent : JVMプロセスで Source、Channel、Sinkコンポーネントをホストします。イベントは Agentを通じて外部の Sourceから next-hopの宛先に移ることができます。
        • Source : クライアントから転送されたイベントを消費します。Sourceがイベントを受けると1つ以上のチャンネルに渡します。
        • Channel : イベントの一時保存場所です。Sourceと Sinkをつなぎ、イベントの流れの Durabilityを保証する重要な役割を担います。
        • Sink : チャンネルからイベントを削除して flowの next-hopに転送します。

    このガイドでは、Cloud Hadoopの HDFSにサーバのログを保存する Flumeトポロジーの構成方法を説明します。

    Flume Agentを使用する

    Flume Agentを用いて以下のように各サーバの vmstat結果を収集し、Cloud Hadoop HDFSに保存する Flumeトポロジーを構成できます。

    hadoop-chadoop-use-ex8_1-1

    Flumeのインストール

    Flume Agentをインストールする方法は、次の通りです。

    1. ログを収集するサーバ3台を作成します。(Server作成ガイドを参照)

      • 各サーバは Cloud Hadoopが含まれた ACGに作成される必要があります。
        • log-gen-001 / centos-7.8–64 / 2vCPU, 8G Mem
        • log-gen-002 / centos-7.8–64 / 2vCPU, 8G Mem
        • log-gen-003 / centos-7.8–64 / 2vCPU, 8G Mem
    2. ~/downloads ~/appsパスのディレクトリを作成します。このパスで Flumeパッケージをダウンロードし、圧縮を展開してインストールを完了します。

      mkdir ~/downloads ~/apps
      
      cd ~/downloads
      wget https://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
      tar -xvf apache-flume-1.9.0-bin.tar.gz
      
      mv apache-flume-1.9.0-bin ~/apps/
      cd ~/apps
      ln -s apache-flume-1.9.0-bin flume
      

    事前タスク

    Cloud Hadoopを HDFS Sinkとして利用するには、以下のような事前タスクが必要です。

    1. ウェブサーバと HDFSとの通信のための事前タスク

    各ログサーバが HDFSネームノードホストと通信するには、各ホストの Private IPアドレスとホスト名を /etc/hostsに登録する必要があります。

    当該情報は Cloud Hadoopのエッジノード(e.g.e-001-xxx)の /etc/hostsで確認できます。

    2. HDFS Sinkを使用するための事前タスク

    HDFS Sinkを使用するために、このトポロジーでは HDFS Sinkを使用します。Flume Agentが実行されるノードでは、Hadoop common jarライブラリが必要となります。また、NameNode HAのためにネームサービスを使用するには hdfs-site.xml , core-site.xmlのような構成ファイルも必要です。

    • Hadoopバイナリのダウンロード
      以下のコマンドを用いて /homeの下位から必要な Hadoopバイナリと.jarライブラリをダウンロードします。
    1. Cloud Hadoop 1.3バージョンでは、以下のコードを使用します。

      # Hadoopバイナリのダウンロード
      wget https://archive.apache.org/dist/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz -P ~/apps
      tar xfz hadoop-2.6.5.tar.gz  
      
      # HDFS Jarのダウンロード
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.5/hadoop-hdfs-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/2.6.5/hadoop-hdfs-client-2.6.5.jar -P ~/apps/hadoop-2.6.5/share/hadoop/common
      
    2. Cloud Hadoop 1.4バージョン以上では、以下のコードを使用します。

      # Hadoopバイナリのダウンロード
      wget https://archive.apache.org/dist/hadoop/common/hadoop-3.1.4/hadoop-3.1.4.tar.gz -P ~/apps
      tar xfz hadoop-3.1.4.tar.gz  
      
      # HDFS Jarのダウンロード
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-auth/3.1.4/hadoop-auth-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/3.1.4/hadoop-hdfs-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
      wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.1.4/hadoop-hdfs-client-3.1.4.jar -P ~/apps/hadoop-3.1.4/share/hadoop/common
      
    • Hadoop configの設定
      $FLUME_CLASS_PATH/conf の下の Hadoop configファイルをダウンロードします。
    $ cd ~/apps/flume/conf
    $ curl -u $AMBARI_ID:$AMBARI_PASS -G 'http://$AMBARI_URI:8080/api/v1/clusters/$CLUSTER_NAME/components?format=client_config_tar' -o client_config.tgz 
    $ tar xfz client_config.tgz
    $ rm -f client_config.tgz
    
    • Hadoop環境変数の設定
      以下のコマンドを実行して Hadoop環境変数を設定します。
    export HADOOP_HOME=~/apps/hadoop-3.1.4
    export HADOOP_HDFS_HOME=$HADOOP_HOME
    export HADOOP_CONF_DIR=~/apps/flume/conf/HDFS_CLIENT/
    export PATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:${PATH}
    

    Flume構成の変更

    Flume構成を変更する方法は、次の通りです。

    1. Flume Agentで以下のコマンドを実行し、構成値を作成します。

      cd ~/apps/flume/conf
      cp flume-conf.properties.template flume.conf
      cp flume-env.sh.template flume-env.sh
      cp ~/apps/hadoop-3.1.4/share/hadoop/common/*.jar ~/apps/flume/lib/ 
      cp ~/apps/hadoop-3.1.4/share/hadoop/common/lib/woodstox-core-5.0.3.jar ~/apps/flume/lib/
      mv ~/apps/flume/lib/guava-11.0.2.jar ~/apps/flume/lib/guava-11.0.2.jar.bak 
      
    2. 各 Flume Agentで以下のように hadoop-env.shJAVA_HOMEHADOOP_HOME オプションの値を修正します。

      • Javaの設定はインストール方法に応じてオプションの値が異なる場合があります。以下は、yumで Javaパッケージのインストール後に設定したパスです。
    # Javaのインストール
    yum install -y java-1.8.0-openjdk
    
    # hadoop-evn.shの編集
    vi $HADOOP_HOME/etc/hadoop/hadoop-env.sh 
    
    # The java implementation to use.  Required.
    export JAVA_HOME=/usr/lib/jvm/jre-openjdk
    # Hadoop home directory
    export HADOOP_HOME=/root/apps/hadoop-3.1.4
    
    • flume.conf
      • Agentの名前と各コンポーネントを定義します。(Agentの名前: fooAgent)
      • HDFSの Sinkパスにはネームサービスが入ったパスを使用します。 Cloud Hadoopではクラスタ名がネームサービスとなります。
      • hdfs-site.xmlにノード情報が含まれているため、どのネームノードが Active状態であるかを明示する必要はありません。
    fooAgent.sources = Exec
    fooAgent.channels = MemChannel
    fooAgent.sinks = HDFS
    
    fooAgent.sources.Exec.type = exec
    fooAgent.sources.Exec.command = /usr/bin/vmstat 1
    fooAgent.sources.Exec.channels = MemChannel 
    
    fooAgent.channels.MemChannel.type = memory
    fooAgent.channels.MemChannel.capacity = 10000
    fooAgent.channels.MemChannel.transactionCapacity = 1000
    
    fooAgent.sinks.HDFS.channel = MemChannel
    fooAgent.sinks.HDFS.type = hdfs
    fooAgent.sinks.HDFS.hdfs.path = hdfs://$CLUSTER_NAME/user/hduser/flume/events/
    fooAgent.sinks.HDFS.hdfs.fileType = DataStream
    fooAgent.sinks.HDFS.hdfs.writeFormat = Text
    fooAgent.sinks.HDFS.hdfs.batchSize = 1000
    fooAgent.sinks.HDFS.hdfs.rollSize = 0
    fooAgent.sinks.HDFS.hdfs.rollCount = 10000
    
    • flume-env.sh
      事前タスクでインストールした hadoop clientのパスを FLUME_CLASSPATHに追加します。
    export JAVA_HOME="/usr/lib/jvm/jre-openjdk"
    export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
    export HADOOP_CONF_DIR="/root/apps/flume/conf/HDFS_CLIENT"
    FLUME_CLASSPATH="/root/apps/flume/lib"
    

    プロセスの開始

    1. ディレクトリの作成と所有者権限の設定

      $ sudo su - hdfs
      $ hdfs dfs -mkdir /user/hduser/flume/events/
      $ hdfs dfs -chown -R sshuser: /user/hduser/flume/events/
      $ exit
      
    2. 以下のコマンドを使用して各 Flume Agentを開始します。

      cd ~/apps/flume/
      
      ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n  fooAgent
      
      ....
      
      20/09/09 12:32:22 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
      20/09/09 12:32:22 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp
      20/09/09 12:32:52 INFO hdfs.HDFSEventSink: Writer callback called.
      20/09/09 12:32:52 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp
      20/09/09 12:32:52 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911
      20/09/09 12:32:55 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
      20/09/09 12:32:55 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp
      20/09/09 12:33:25 INFO hdfs.HDFSEventSink: Writer callback called.
      20/09/09 12:33:25 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp
      20/09/09 12:33:25 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913
      20/09/09 12:33:28 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
      20/09/09 12:33:28 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp
      20/09/09 12:33:58 INFO hdfs.HDFSEventSink: Writer callback called.
      20/09/09 12:33:58 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp
      20/09/09 12:33:58 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915
      20/09/09 12:34:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
      20/09/09 12:34:01 INFO hdfs.BucketWriter: Creating
      
    3. 以下のコマンドを用いると、HDFSで確認できます。

      $ hadoop fs -ls /user/hduser/flume/events/
      
      Found 17 items
      
      -rw-r--r--   2 root hdfs       3089 2020-09-09 12:25 /user/hduser/flume/events/FlumeData.1599621914876
      -rw-r--r--   2 root hdfs       3093 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621946882
      -rw-r--r--   2 root hdfs       2931 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621979885
      -rw-r--r--   2 root hdfs       3091 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622012888
      -rw-r--r--   2 root hdfs       2931 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622045890
      -rw-r--r--   2 root hdfs       3091 2020-09-09 12:28 /user/hduser/flume/events/FlumeData.1599622078893
      -rw-r--r--   2 root hdfs       2930 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622111895
      -rw-r--r--   2 root hdfs       3093 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622144897
      -rw-r--r--   2 root hdfs       3092 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622177899
      -rw-r--r--   2 root hdfs       2931 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622210902
      -rw-r--r--   2 root hdfs       3093 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622243904
      -rw-r--r--   2 root hdfs       2932 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622276906
      
    参考

    実際の運用環境では、2つ以上の Flume Agentをパイプ(pipe)しながらインターセプター(interceptor)を活用してイベントを変換します。
    Source、Channel、Sinkには様々なタイプが存在し、Kafkaを Channel、Sinkとしてよく使用します。


    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.