Oozie を使用する

Prev Next

VPC環境で利用できます。

Oozieは、Apache HadoopのMapReduceジョブフローを管理するワークフロースケジューラーシステムです。頻繁なMapReduceジョブを管理でき、定期的なジョブを便利に行えるようにサポートします。このガイドでは、Oozieを使用してワークフローを作成する例を説明します。

参考

Oozieについての詳しい説明はOozieの公式文書をご参照ください。

OozieCLI

Oozie CLIはApache Oozieで提供するクライアントです。Oozie CLIは一部機能に対するカスタマイズを適用したクライアントで、ジョブを提出するとすぐに返され、ジョブの終了を待ちません。

項目 Oozie CLI
実行スクリプト名 /bin/oozie
サポートするオプション job、jobs、admin、validate…などの様々なオプションをサポート
コマンド oozie job <OPTIONS>の形でコマンドを使用
job operation run、rerun、submit、suspendなどのOperationが可能

ワークフロー

ワークフローはXMLベースのhPDL(hadoop Process Definition Language)で定義します。ワークフローの定義は、フロー制御ノードとアクションノードで構成されたDAG(Direct Acyclic Graph)であるため、有向閉路グラフを提供しません。

ワークフローには、フロー制御ノードとアクションノードが含まれています。

  • フロー制御ノード:ワークフローの開始と終了を定義(start、end、failノード)し、ワークフロー実行方向(decision、fork、joinノード)を制御する方法を提供します。ワークフローは${inputDir}のようにパラメータを使用できます。ジョブを提出する際は、ワークフローのパラメータ値をjob propertiesファイルで作成して提供する必要があります。
  • アクションノード:ワークフローが計算と処理タスクの実行を開始できるようにし、HadoopのMapReduce、ファイルシステム、SSH、メール、サブワークフローのような様々なアクションを提供します。

ワークフローの作成

ワークフローはworkflow.xmlに作成します。

ワークフローで定義したDAGは以下のように動作します。以下の図を参考にしてワークフローを定義します。

df-eco-oozie_01_vpc_ko

フロー制御ノードでstart、end、killノードの名前を明示します。

# [ACTION-NODE-NAME]、[NEXT-NODE-NAME]、[END-NODE-NAME]、[ERROR-NODE-NAME]を希望する作業に合わせて明示。

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
  <start to="[ACTION-NODE-NAME]"/>

  <action name="[ACTION-NODE-NAME]">
    ...
    <ok to="[END-NODE-NAME]">
    <error to="[ERROR-NODE-NAME]">
  </action>

  ...

  <kill name="[ERROR-NODE-NAME]">
    <message>[MESSAGE-TO-LOG]</message>
  </kill>

  <end name="[END-NODE-NAME]"/>
</workflow-app>
  • startノード
    ワークフローが実行される際、最初に開始されるアクションを明示します。アクションは定義された順序ではなく、startノードに明示された順に実行されます。アクションノードの作成方法はアクションノードの作成をご参照ください。
  • okノード
    アクションの名前を明示し、アクションの実行が正常に終了した場合に実行するノードを明示します。上記の例には[END-NODE-NAME]と表記されていますが、二番目に実行されるアクションノード名を明示することもできます。
  • errorノード
    アクションの実行が異常終了した場合に実行するノードを明示します。okノードと同様に、killノードまたは次に実行するアクションノードを作成します。
  • killノード
    ワークフロージョブを自ら終了できます。ジョブはエラーで終了します(KILLED)。messageの内容はワークフロージョブの終了原因ログとして残ります。
  • endノード
    ワークフロージョブの最後で、ジョブに成功したことを意味します(SUCCEEDED)。
参考

説明されたノード以外のフロー制御ノードについての詳しい説明は、decisionfork and joinをご参照ください。

アクションノードの作成

アクションノードを定義します。
MapReduceアクションの例は以下のとおりです。

  <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
  ...
    <action name="[ACTION-NODE-NAME]">
        <map-reduce>
            <job-tracker>[ResourceManagerHostAddress:Port]</job-tracker>
            <name-node>[NameNodeHostAddress:Port]</name-node>

            <prepare>
              <delete path="[OUTPUT_DIR]"/>
            </prepare>

            <!-- ストリーミングの場合にのみstreaming定義を追加します。-->
            <streaming>
              <mapper>[MAPPER]</mapper>
              <reducer>[REDUCER]</reducer>
            </streaming>

            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            ...
            <!-- Distributed Cacheを使用する場合 -->
            <!-- 相対パスの場合、job propertiesのapplication pathを基準とする -->
            <file>[FILE-PATH]</file>
            <archive>[FILE-PATH]</archive>
            ...
        </map-reduce>
        <ok to="[END-NODE-NAME]"/>
        <error to="[ERROR-NODE-NAME]"/>
    </action>
    ...
    </workflow-app>

MapReduce version2(以下MRv2)で各アクションノードを定義する方法は以下のとおりです。

  • job-tracker
    MRv2でResourceManagerのアドレスに指定します。ResourceManagerHostAddress:Portのように作成します。Data ForestはResourceManagerにIDを付与して使用しているため、rm1rm2のどちらかを使用します。
  • name-node
    NameNodeアドレスを作成します。Data Forestは2つのHDFSネームスペースを提供するため、hdfs://koyaまたはhdfs://tataのうち使用するネームスペースを指定します。
  • prepare
    ジョブが開始する前に実行されるジョブを作成します。出力ディレクトリの削除のようなジョブを作成します。
  • configuration
    ジョブに対する設定はConfiguration以降に作成します。MRv2の場合、mapred.[mapper|reducer].new-apitrueに設定します。他のジョブに必要な設定をすべて作成します。
  • file / archive
    Distributed Cacheでファイルを配布する場合、fileまたはarchive定義を追加します。
参考

ワークフローの設定内容は、${NAME}のようにパラメータとして定義できます。ジョブを実行するたびに設定内容を変える必要がある場合、パラメータとして定義しておくとワークフローの修正なしに様々な環境でジョブを実行できます。

ワークフローの例

フロー制御ノードとアクションノードを定義し、JavaとPythonで作成されたwordcountプログラムをワークフローで作成します。

Javaを利用したMapReduceの例

<workflow-app name="WordCountWithJava" xmlns="uri:oozie:workflow:0.5">
<start to="wordcount"/>
<action name="wordcount">
    <map-reduce>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="${outputDir}" />
        </prepare>
        <configuration>
            <property>
                <name>mapred.mapper.new-api</name>
                <value>true</value>
            </property>
            <property>
                <name>mapred.reducer.new-api</name>
                <value>true</value>
            </property>
            <property>
                <name>mapreduce.job.queuename</name>
                <value>${queueName}</value>
            </property>
            <property>
                <name>mapreduce.input.fileinputformat.inputdir</name>
                <value>${inputDir}</value>
            </property>
            <property>
                <name>mapreduce.output.fileoutputformat.outputdir</name>
                <value>${outputDir}</value>
            </property>
            <property>
                <name>mapreduce.job.reduces</name>
                <value>${reduceNum}</value>
            </property>
            <property>
                <name>mapreduce.task.timeout</name>
                <value>0</value>
            </property>
            <property>
                <name>mapreduce.job.map.class</name>
                <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
            </property>
            <property>
                <name>mapreduce.job.reduce.class</name>
                <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
            </property>
            <property>
                <name>mapreduce.job.output.key.class</name>
                <value>org.apache.hadoop.io.Text</value>
            </property>
            <property>
                <name>mapreduce.job.output.value.class</name>
                <value>org.apache.hadoop.io.IntWritable</value>
            </property>
            <property>
                <name>mapreduce.job.inputformat.class</name>
                <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
            </property>
            <property>
                <name>mapreduce.job.outputformat.class</name>
                <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
            </property>
        </configuration>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
</action> <!--}}}-->

<kill name="fail">
    <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

ストリーミングを利用したMapReduceの例

<workflow-app name="WordCountStreaming" xmlns="uri:oozie:workflow:0.5">
    <start to="wordcount"/>
    <action name="wordcount">
    <map-reduce>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <prepare>
            <delete path="${outputDir}" />
        </prepare>
        <streaming>
                <mapper>python mapper.py</mapper>
                <reducer>python reducer.py</reducer>
        </streaming>
        <configuration>
            <property>
                <name>mapreduce.job.queuename</name>
                <value>${queueName}</value>
            </property>
            <property>
                <name>mapreduce.input.fileinputformat.inputdir</name>
                <value>${inputDir}</value>
            </property>
            <property>
                <name>mapreduce.output.fileoutputformat.outputdir</name>
                <value>${outputDir}</value>
            </property>
            <property>
                <name>mapreduce.job.reduces</name>
                <value>${reduceNum}</value>
            </property>
            <property>
                <name>mapreduce.task.timeout</name>
                <value>0</value>
            </property>
        </configuration>
        <file>mapper.py</file>
        <file>reducer.py</file>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
</action> <!--}}}-->

<kill name="fail">
    <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

ワークフローの配布

ワークフローを作成してから実際にジョブを実行するには、HDFS上にワークフローと実行に必要なライブラリを配布する必要があります。
作成されたワークフローをHDFSのジョブディレクトリにアップロードし、libディレクトリにジョブの実行に必要なライブラリ(.jar、.so)を配置します。

├── workflow.xml
└── lib
    └── hadoop-mapreduce-examples-2.7.1.jar
参考

libディレクトリに含まれたJARファイルは、自動でclasspathに追加されます。追加で必要なJARファイルやネイティブライブラリをfileエレメントに明示する場合には、libディレクトリに配置する必要はありません。

ストリーミングジョブ

ストリーミングジョブの方法は以下のとおりです。

  1. libディレクトリにhadoop-streaming-<VERSION>.jarをアップロードします。
  2. mapperとreducerに明示したスクリプトやプログラムはHDFSパスにアップロードします。
├── workflow.xml
├── mapper.py
├── reducer.py
└── lib
    └── hadoop-streaming-2.7.1.jar

ワークフローの実行

HDFS上に配布したワークフローを実行する前に、job propertiesを作成してください。

job propertiesの作成

ワークフローのパラメータを設定したjob propertiesファイルを作成します。

# hadoop configuration
nameNode=hdfs://koya
jobTracker=rm1

# oozie configuration
oozie.wf.application.path=${serviceDir}/workflow

# job directory information
serviceDir=${nameNode}/user/test01/wordcount
inputDir=${serviceDir}/input
outputDir=${serviceDir}/output

# user information
user.name=test01

# job configuration
queueName=default
reduceNum=2

# NameNode、Jobtrackerはジョブを実行するクラスタ情報を記入します。
# oozie.wf.application.pathは、実行するジョブのワークフローが配布された位置を記入します。

job propertiesの配布

job properitesファイルは、作成後にHDFSやその他のシステムに配布する必要がありません。Oozieコマンドを実行するデバイスに配置します。

job propertiesの実行

ワークフローの配布とjob propertiesファイルの作成が完了した後、Oozieジョブを実行します。

oozie job -oozie <oozie server> -config <job properties> <option>

$ oozie job -oozie http://oozie.kr.df.naverncp.com:11000/oozie -config job.properties -run -doas test01