- 印刷する
- PDF
Oozie を使用する
- 印刷する
- PDF
VPC環境で利用できます。
Oozieは、Apache HadoopのMapReduceジョブフローを管理するワークフロースケジューラーシステムです。頻繁なMapReduceジョブを管理でき、定期的なジョブを便利に行えるようにサポートします。このガイドでは、Oozieを使用してワークフローを作成する例を説明します。
Oozieについての詳しい説明はOozieの公式文書をご参照ください。
OozieCLI
Oozie CLIはApache Oozieで提供するクライアントです。Oozie CLIは一部機能に対するカスタマイズを適用したクライアントで、ジョブを提出するとすぐに返され、ジョブの終了を待ちません。
項目 | Oozie CLI |
---|---|
実行スクリプト名 | /bin/oozie |
サポートするオプション | job、jobs、admin、validate…などの様々なオプションをサポート |
コマンド | oozie job <OPTIONS> の形でコマンドを使用 |
job operation | run、rerun、submit、suspendなどのOperationが可能 |
ワークフロー
ワークフローはXMLベースのhPDL(hadoop Process Definition Language)で定義します。ワークフローの定義は、フロー制御ノードとアクションノードで構成されたDAG(Direct Acyclic Graph)であるため、有向閉路グラフを提供しません。
ワークフローには、フロー制御ノードとアクションノードが含まれています。
- フロー制御ノード:ワークフローの開始と終了を定義(start、end、failノード)し、ワークフロー実行方向(decision、fork、joinノード)を制御する方法を提供します。ワークフローは
${inputDir}
のようにパラメータを使用できます。ジョブを提出する際は、ワークフローのパラメータ値をjob propertiesファイルで作成して提供する必要があります。 - アクションノード:ワークフローが計算と処理タスクの実行を開始できるようにし、HadoopのMapReduce、ファイルシステム、SSH、メール、サブワークフローのような様々なアクションを提供します。
ワークフローの作成
ワークフローはworkflow.xmlに作成します。
ワークフローで定義したDAGは以下のように動作します。以下の図を参考にしてワークフローを定義します。
フロー制御ノードでstart、end、killノードの名前を明示します。
# [ACTION-NODE-NAME]、[NEXT-NODE-NAME]、[END-NODE-NAME]、[ERROR-NODE-NAME]を希望する作業に合わせて明示。
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
<start to="[ACTION-NODE-NAME]"/>
<action name="[ACTION-NODE-NAME]">
...
<ok to="[END-NODE-NAME]">
<error to="[ERROR-NODE-NAME]">
</action>
...
<kill name="[ERROR-NODE-NAME]">
<message>[MESSAGE-TO-LOG]</message>
</kill>
<end name="[END-NODE-NAME]"/>
</workflow-app>
- startノード
ワークフローが実行される際、最初に開始されるアクションを明示します。アクションは定義された順序ではなく、startノードに明示された順に実行されます。アクションノードの作成方法はアクションノードの作成をご参照ください。 - okノード
アクションの名前を明示し、アクションの実行が正常に終了した場合に実行するノードを明示します。上記の例には[END-NODE-NAME]
と表記されていますが、二番目に実行されるアクションノード名を明示することもできます。 - errorノード
アクションの実行が異常終了した場合に実行するノードを明示します。okノードと同様に、killノードまたは次に実行するアクションノードを作成します。 - killノード
ワークフロージョブを自ら終了できます。ジョブはエラーで終了します(KILLED)。messageの内容はワークフロージョブの終了原因ログとして残ります。 - endノード
ワークフロージョブの最後で、ジョブに成功したことを意味します(SUCCEEDED)。
説明されたノード以外のフロー制御ノードについての詳しい説明は、decisionとfork and joinをご参照ください。
アクションノードの作成
アクションノードを定義します。
MapReduceアクションの例は以下のとおりです。
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
...
<action name="[ACTION-NODE-NAME]">
<map-reduce>
<job-tracker>[ResourceManagerHostAddress:Port]</job-tracker>
<name-node>[NameNodeHostAddress:Port]</name-node>
<prepare>
<delete path="[OUTPUT_DIR]"/>
</prepare>
<!-- ストリーミングの場合にのみstreaming定義を追加します。-->
<streaming>
<mapper>[MAPPER]</mapper>
<reducer>[REDUCER]</reducer>
</streaming>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
...
<!-- Distributed Cacheを使用する場合 -->
<!-- 相対パスの場合、job propertiesのapplication pathを基準とする -->
<file>[FILE-PATH]</file>
<archive>[FILE-PATH]</archive>
...
</map-reduce>
<ok to="[END-NODE-NAME]"/>
<error to="[ERROR-NODE-NAME]"/>
</action>
...
</workflow-app>
MapReduce version2(以下MRv2)で各アクションノードを定義する方法は以下のとおりです。
job-tracker
MRv2でResourceManagerのアドレスに指定します。ResourceManagerHostAddress:Port
のように作成します。Data ForestはResourceManagerにIDを付与して使用しているため、rm1かrm2のどちらかを使用します。name-node
NameNodeアドレスを作成します。Data Forestは2つのHDFSネームスペースを提供するため、hdfs://koya
またはhdfs://tata
のうち使用するネームスペースを指定します。prepare
ジョブが開始する前に実行されるジョブを作成します。出力ディレクトリの削除のようなジョブを作成します。configuration
ジョブに対する設定はConfiguration以降に作成します。MRv2の場合、mapred.[mapper|reducer].new-api
はtrueに設定します。他のジョブに必要な設定をすべて作成します。file
/archive
Distributed Cacheでファイルを配布する場合、fileまたはarchive定義を追加します。
ワークフローの設定内容は、${NAME}
のようにパラメータとして定義できます。ジョブを実行するたびに設定内容を変える必要がある場合、パラメータとして定義しておくとワークフローの修正なしに様々な環境でジョブを実行できます。
ワークフローの例
フロー制御ノードとアクションノードを定義し、JavaとPythonで作成されたwordcountプログラムをワークフローで作成します。
Javaを利用したMapReduceの例
<workflow-app name="WordCountWithJava" xmlns="uri:oozie:workflow:0.5">
<start to="wordcount"/>
<action name="wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${outputDir}</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>${reduceNum}</value>
</property>
<property>
<name>mapreduce.task.timeout</name>
<value>0</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.job.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
</property>
<property>
<name>mapreduce.job.outputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action> <!--}}}-->
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
ストリーミングを利用したMapReduceの例
<workflow-app name="WordCountStreaming" xmlns="uri:oozie:workflow:0.5">
<start to="wordcount"/>
<action name="wordcount">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<streaming>
<mapper>python mapper.py</mapper>
<reducer>python reducer.py</reducer>
</streaming>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${inputDir}</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${outputDir}</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>${reduceNum}</value>
</property>
<property>
<name>mapreduce.task.timeout</name>
<value>0</value>
</property>
</configuration>
<file>mapper.py</file>
<file>reducer.py</file>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action> <!--}}}-->
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
ワークフローの配布
ワークフローを作成してから実際にジョブを実行するには、HDFS上にワークフローと実行に必要なライブラリを配布する必要があります。
作成されたワークフローをHDFSのジョブディレクトリにアップロードし、libディレクトリにジョブの実行に必要なライブラリ(.jar、.so)を配置します。
├── workflow.xml
└── lib
└── hadoop-mapreduce-examples-2.7.1.jar
libディレクトリに含まれたJARファイルは、自動でclasspathに追加されます。追加で必要なJARファイルやネイティブライブラリをfileエレメントに明示する場合には、libディレクトリに配置する必要はありません。
ストリーミングジョブ
ストリーミングジョブの方法は以下のとおりです。
- libディレクトリに
hadoop-streaming-<VERSION>.jar
をアップロードします。 - mapperとreducerに明示したスクリプトやプログラムはHDFSパスにアップロードします。
├── workflow.xml
├── mapper.py
├── reducer.py
└── lib
└── hadoop-streaming-2.7.1.jar
ワークフローの実行
HDFS上に配布したワークフローを実行する前に、job propertiesを作成してください。
job propertiesの作成
ワークフローのパラメータを設定したjob propertiesファイルを作成します。
# hadoop configuration
nameNode=hdfs://koya
jobTracker=rm1
# oozie configuration
oozie.wf.application.path=${serviceDir}/workflow
# job directory information
serviceDir=${nameNode}/user/test01/wordcount
inputDir=${serviceDir}/input
outputDir=${serviceDir}/output
# user information
user.name=test01
# job configuration
queueName=default
reduceNum=2
# NameNode、Jobtrackerはジョブを実行するクラスタ情報を記入します。
# oozie.wf.application.pathは、実行するジョブのワークフローが配布された位置を記入します。
job propertiesの配布
job properitesファイルは、作成後にHDFSやその他のシステムに配布する必要がありません。Oozieコマンドを実行するデバイスに配置します。
job propertiesの実行
ワークフローの配布とjob propertiesファイルの作成が完了した後、Oozieジョブを実行します。
oozie job -oozie <oozie server> -config <job properties> <option>
$ oozie job -oozie http://oozie.kr.df.naverncp.com:11000/oozie -config job.properties -run -doas test01