Oozie を使用する
    • PDF

    Oozie を使用する

    • PDF

    Article Summary

    VPC環境で利用できます。

    Oozieは、Apache HadoopのMapReduceジョブフローを管理するワークフロースケジューラーシステムです。頻繁なMapReduceジョブを管理でき、定期的なジョブを便利に行えるようにサポートします。このガイドでは、Oozieを使用してワークフローを作成する例を説明します。

    参考

    Oozieについての詳しい説明はOozieの公式文書をご参照ください。

    OozieCLI

    Oozie CLIはApache Oozieで提供するクライアントです。Oozie CLIは一部機能に対するカスタマイズを適用したクライアントで、ジョブを提出するとすぐに返され、ジョブの終了を待ちません。

    項目Oozie CLI
    実行スクリプト名/bin/oozie
    サポートするオプションjob、jobs、admin、validate…などの様々なオプションをサポート
    コマンドoozie job <OPTIONS>の形でコマンドを使用
    job operationrun、rerun、submit、suspendなどのOperationが可能

    ワークフロー

    ワークフローはXMLベースのhPDL(hadoop Process Definition Language)で定義します。ワークフローの定義は、フロー制御ノードとアクションノードで構成されたDAG(Direct Acyclic Graph)であるため、有向閉路グラフを提供しません。

    ワークフローには、フロー制御ノードとアクションノードが含まれています。

    • フロー制御ノード:ワークフローの開始と終了を定義(start、end、failノード)し、ワークフロー実行方向(decision、fork、joinノード)を制御する方法を提供します。ワークフローは${inputDir}のようにパラメータを使用できます。ジョブを提出する際は、ワークフローのパラメータ値をjob propertiesファイルで作成して提供する必要があります。
    • アクションノード:ワークフローが計算と処理タスクの実行を開始できるようにし、HadoopのMapReduce、ファイルシステム、SSH、メール、サブワークフローのような様々なアクションを提供します。

    ワークフローの作成

    ワークフローはworkflow.xmlに作成します。

    ワークフローで定義したDAGは以下のように動作します。以下の図を参考にしてワークフローを定義します。

    df-eco-oozie_01_vpc_ko

    フロー制御ノードでstart、end、killノードの名前を明示します。

    # [ACTION-NODE-NAME]、[NEXT-NODE-NAME]、[END-NODE-NAME]、[ERROR-NODE-NAME]を希望する作業に合わせて明示。
    
    <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
      <start to="[ACTION-NODE-NAME]"/>
    
      <action name="[ACTION-NODE-NAME]">
        ...
        <ok to="[END-NODE-NAME]">
        <error to="[ERROR-NODE-NAME]">
      </action>
    
      ...
    
      <kill name="[ERROR-NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
      </kill>
    
      <end name="[END-NODE-NAME]"/>
    </workflow-app>
    
    • startノード
      ワークフローが実行される際、最初に開始されるアクションを明示します。アクションは定義された順序ではなく、startノードに明示された順に実行されます。アクションノードの作成方法はアクションノードの作成をご参照ください。
    • okノード
      アクションの名前を明示し、アクションの実行が正常に終了した場合に実行するノードを明示します。上記の例には[END-NODE-NAME]と表記されていますが、二番目に実行されるアクションノード名を明示することもできます。
    • errorノード
      アクションの実行が異常終了した場合に実行するノードを明示します。okノードと同様に、killノードまたは次に実行するアクションノードを作成します。
    • killノード
      ワークフロージョブを自ら終了できます。ジョブはエラーで終了します(KILLED)。messageの内容はワークフロージョブの終了原因ログとして残ります。
    • endノード
      ワークフロージョブの最後で、ジョブに成功したことを意味します(SUCCEEDED)。
    参考

    説明されたノード以外のフロー制御ノードについての詳しい説明は、decisionfork and joinをご参照ください。

    アクションノードの作成

    アクションノードを定義します。
    MapReduceアクションの例は以下のとおりです。

      <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
      ...
        <action name="[ACTION-NODE-NAME]">
            <map-reduce>
                <job-tracker>[ResourceManagerHostAddress:Port]</job-tracker>
                <name-node>[NameNodeHostAddress:Port]</name-node>
    
                <prepare>
                  <delete path="[OUTPUT_DIR]"/>
                </prepare>
    
                <!-- ストリーミングの場合にのみstreaming定義を追加します。-->
                <streaming>
                  <mapper>[MAPPER]</mapper>
                  <reducer>[REDUCER]</reducer>
                </streaming>
    
                <configuration>
                    <property>
                        <name>mapred.mapper.new-api</name>
                        <value>true</value>
                    </property>
                    <property>
                        <name>mapred.reducer.new-api</name>
                        <value>true</value>
                    </property>
                    <property>
                        <name>[PROPERTY-NAME]</name>
                        <value>[PROPERTY-VALUE]</value>
                    </property>
                    ...
                </configuration>
                ...
                <!-- Distributed Cacheを使用する場合 -->
                <!-- 相対パスの場合、job propertiesのapplication pathを基準とする -->
                <file>[FILE-PATH]</file>
                <archive>[FILE-PATH]</archive>
                ...
            </map-reduce>
            <ok to="[END-NODE-NAME]"/>
            <error to="[ERROR-NODE-NAME]"/>
        </action>
        ...
        </workflow-app>
    

    MapReduce version2(以下MRv2)で各アクションノードを定義する方法は以下のとおりです。

    • job-tracker
      MRv2でResourceManagerのアドレスに指定します。ResourceManagerHostAddress:Portのように作成します。Data ForestはResourceManagerにIDを付与して使用しているため、rm1rm2のどちらかを使用します。
    • name-node
      NameNodeアドレスを作成します。Data Forestは2つのHDFSネームスペースを提供するため、hdfs://koyaまたはhdfs://tataのうち使用するネームスペースを指定します。
    • prepare
      ジョブが開始する前に実行されるジョブを作成します。出力ディレクトリの削除のようなジョブを作成します。
    • configuration
      ジョブに対する設定はConfiguration以降に作成します。MRv2の場合、mapred.[mapper|reducer].new-apitrueに設定します。他のジョブに必要な設定をすべて作成します。
    • file / archive
      Distributed Cacheでファイルを配布する場合、fileまたはarchive定義を追加します。
    参考

    ワークフローの設定内容は、${NAME}のようにパラメータとして定義できます。ジョブを実行するたびに設定内容を変える必要がある場合、パラメータとして定義しておくとワークフローの修正なしに様々な環境でジョブを実行できます。

    ワークフローの例

    フロー制御ノードとアクションノードを定義し、JavaとPythonで作成されたwordcountプログラムをワークフローで作成します。

    Javaを利用したMapReduceの例

    <workflow-app name="WordCountWithJava" xmlns="uri:oozie:workflow:0.5">
    <start to="wordcount"/>
    <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}" />
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${outputDir}</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>${reduceNum}</value>
                </property>
                <property>
                    <name>mapreduce.task.timeout</name>
                    <value>0</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.inputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
                </property>
                <property>
                    <name>mapreduce.job.outputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action> <!--}}}-->
    
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
    </workflow-app>
    

    ストリーミングを利用したMapReduceの例

    <workflow-app name="WordCountStreaming" xmlns="uri:oozie:workflow:0.5">
        <start to="wordcount"/>
        <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}" />
            </prepare>
            <streaming>
                    <mapper>python mapper.py</mapper>
                    <reducer>python reducer.py</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${outputDir}</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>${reduceNum}</value>
                </property>
                <property>
                    <name>mapreduce.task.timeout</name>
                    <value>0</value>
                </property>
            </configuration>
            <file>mapper.py</file>
            <file>reducer.py</file>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action> <!--}}}-->
    
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
    </workflow-app>
    

    ワークフローの配布

    ワークフローを作成してから実際にジョブを実行するには、HDFS上にワークフローと実行に必要なライブラリを配布する必要があります。
    作成されたワークフローをHDFSのジョブディレクトリにアップロードし、libディレクトリにジョブの実行に必要なライブラリ(.jar、.so)を配置します。

    ├── workflow.xml
    └── lib
        └── hadoop-mapreduce-examples-2.7.1.jar
    
    参考

    libディレクトリに含まれたJARファイルは、自動でclasspathに追加されます。追加で必要なJARファイルやネイティブライブラリをfileエレメントに明示する場合には、libディレクトリに配置する必要はありません。

    ストリーミングジョブ

    ストリーミングジョブの方法は以下のとおりです。

    1. libディレクトリにhadoop-streaming-<VERSION>.jarをアップロードします。
    2. mapperとreducerに明示したスクリプトやプログラムはHDFSパスにアップロードします。
    ├── workflow.xml
    ├── mapper.py
    ├── reducer.py
    └── lib
        └── hadoop-streaming-2.7.1.jar
    

    ワークフローの実行

    HDFS上に配布したワークフローを実行する前に、job propertiesを作成してください。

    job propertiesの作成

    ワークフローのパラメータを設定したjob propertiesファイルを作成します。

    # hadoop configuration
    nameNode=hdfs://koya
    jobTracker=rm1
    
    # oozie configuration
    oozie.wf.application.path=${serviceDir}/workflow
    
    # job directory information
    serviceDir=${nameNode}/user/test01/wordcount
    inputDir=${serviceDir}/input
    outputDir=${serviceDir}/output
    
    # user information
    user.name=test01
    
    # job configuration
    queueName=default
    reduceNum=2
    
    # NameNode、Jobtrackerはジョブを実行するクラスタ情報を記入します。
    # oozie.wf.application.pathは、実行するジョブのワークフローが配布された位置を記入します。
    

    job propertiesの配布

    job properitesファイルは、作成後にHDFSやその他のシステムに配布する必要がありません。Oozieコマンドを実行するデバイスに配置します。

    job propertiesの実行

    ワークフローの配布とjob propertiesファイルの作成が完了した後、Oozieジョブを実行します。

    oozie job -oozie <oozie server> -config <job properties> <option>
    
    $ oozie job -oozie http://oozie.kr.df.naverncp.com:11000/oozie -config job.properties -run -doas test01
    

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.