Oozie スケジューラーに Spark バッチジョブを登録

Prev Next

VPC環境で利用できます。

Data ForestでSparkバッチ処理をApache Oozieスケジューラーに登録する方法を説明します。このバッチアプリケーションのソースデータはElasticsearchに保存されています。

df-usercase1_01_vpc_ko

Step 1. 事前準備

1. Data Forestアカウントの作成

  • exampleというアカウントを作成します。
  • ジョブを提出するためにKerberos Keytabが必要となるため、予めダウンロードします。(クラスタのアクセス情報 > Kerberos Keytabのダウンロードでダウンロードできる)
    df-usercase1_02_vpc_ja

2. Data Forestアプリの作成

  • Elasticsearch:ソースデータ(ログなど)が溜まる保存場所です。
  • Kibana:インデックスを検索し、可視化するのに使用します。Kibanaアプリを作成するには、まずElasticsearchアプリを作成する必要があります。アプリ連携情報で、予め作成したElasticsearchアプリの名前を選択します。
    df-usercase1_1-1_ja
  • Dev:Oozieサーバにワークフローを提出するためのクライアントとして使用します。
    df-usercase1_03_vpc_ja

3. サンプルデータの注入

  • 別途データ収集モジュールを置かずに、Kibanaアプリで提供するSample eCommerce ordersをソースデータとして使用することを例に挙げて説明します。
  • Kibana UIで [Load a data set and a Kibana dashboard] > [Sample eCommerce orders] をクリックしてサンプルインデックスを作成します。
  • Kibanaのアドレスは、KibanaアプリのQuick linksで確認できます。
    df-usercase1_04_vpc_ko

Step 2. Sparkアプリのビルド

実行するSparkアプリをビルドしてください。この例で使用した Sparkアプリは、Elasticsearchに保存されているkibana_sample_data_ecommerceインデックスをクレンジングして再度インデックスとして使用するジョブを実行します。実行される時点の24時間前のデータのみ取得して処理した後、再度インデックスとして保存するジョブです。既にビルドされたJARファイルをダウンロードして使用しても構いません。

Step 3. Oozieコーディネーターでワークフローを実行

すべてのジョブはDevアプリで実行します。Data Forestを使用するためのすべてのクライアント構成がDevアプリにあります。

Oozieは、Apache HadoopのMapReduceジョブフローを管理するスケジューラーシステムです。ワークフローは、フローノード(start、end、decisionなど)とアクションノード(mr、pig、shellなど)で構成されたDAG(Directed Acyclic Graph)です。複数のバッチアプリケーションとそのアプリケーションの前後に必要なスクリプト実行をワークフローに登録すると、フローを一目で把握してモニタリングできます。

以下の図のように簡単な形のワークフローを作成することを例に挙げて説明します。このワークフローが一日に一回ずつ実行されるように、コーディネーターで実行してみます。

df-use-ex1-workflow_vpc

1. シェルアクションスクリプトの作成

ワークフローにシェルアクションを登録してspark-submitを実行する例です。
実行するshスクリプトを作成します。予め作成したSPARK_JARはElasticsearchノードアドレスを引数で取得するため、Quick linkselasticsearch.hosts.inside-of-clusterを探して使用します。

注意

クラスタの外部からElasticsearchアプリにアクセスする場合には、elasticsearch.hostsに明示されたアドレスを使用してください。

spark_submit_action.sh

#!/bin/sh 
CLASS=$1
SPARK_JAR=$2
QUEUE=$3

export HDP_VERSION=3.1.0.0-78

# キータブでKerberos認証
kinit {user name} -kt ./{enter your keytab}
{user name}@KR.DF.NAVERNCP.COM

# Spark設定パス
export SPARK_HOME=./spark2.tar.gz/spark2
export SPARK_CONF_DIR=./spark_conf.tar.gz/conf

# Hadoop設定パス
export HADOOP_CONF_DIR=./hadoop_conf.tar.gz/conf

# Spark作業提出
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue $QUEUE \
--num-executors 2 \
--executor-memory 1G \
--driver-memory 1G \
--executor-cores 1 \
--name $CLASS \
--class $CLASS \
$SPARK_JAR "{enter your elasticsearch.hosts.inside-of-cluster address}"

上記のスクリプトでSPARK_HOMESPARK_CONF_DIRHADOOP_CONF_DIRのパスを確認します。スクリプトで使用するすべてのファイルは分散キャッシュにアップロードしたファイルパスです。Data ForestではSparkパッケージを提供しないため、パッケージと構成情報を「tar」で圧縮して分散キャッシュにアップロードしてください。アップロードする方法はディレクトリの構成と配布をご参照ください。

2. job.propertiesとworkflow.xml、coordinator.xmlの作成

job.propertiesでは、XMLファイルで使用するパラメータを定義します。このファイルは別に配布する必要がなく、Oozie CLIを実行するデバイスにだけ存在すれば良いものです。

# cluster configuration 
nameNode=hdfs://koya
jobTracker=rm1

# job directory information
homeDir=${nameNode}/user/{user name}/myproject/oozie
workflowDir=${homeDir}/workflow

# oozie configuration
# this is where you deploy coordinator.xml into
oozie.coord.application.path=${homeDir}/coordinator

# user information
user.name= {user name}

# job configuration
class=ecomm.BatchProducerRunnerSpark
queueName=longlived
shellActionScript=spark_submit_action.sh
sparkJar=sample-analyzer-1-assembly-0.1.jar

workflow.xmlは以下のようにシェルアクションを定義できます。job.propertiesに定義したパラメータを使用したことが確認できます。

<workflow-app name="exampleWorkflowJob" xmlns="uri:oozie:workflow:0.5">
	<start to="exampleShellAction"/>

	<action name="exampleShellAction">
		<shell xmlns="uri:oozie:shell-action:0.1">
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>

			<exec>./action.sh</exec>
      <argument>${class}</argument>
			<argument>${sparkJar}</argument>
			<argument>${queueName}</argument>

			<file>${workflowDir}/${shellActionScript}#action.sh</file>
			<file>${workflowDir}/${sparkJar}</file>
      		<file>${homeDir}/{enter your keytab}</file>
			<archive>${homeDir}/hadoop_conf.tar.gz</archive>
			<archive>${homeDir}/spark_conf.tar.gz</archive>
			<archive>${homeDir}/spark2.tar.gz</archive>

			<capture-output/>
		</shell>
		<ok to="end"/>
		<error to="fail"/>
	</action>

	<kill name="fail">
		<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name="end"/>
</workflow-app>

coordinator.xmlは、ワークフローをスケジューリングするコーディネーターの明細ファイルです。一日に一回実行されるように設定しました。ワークフローと同様にXML形式です。開始時刻はOozie CLIを実行する際にオプションとして与えます。

<coordinator-app name="exampleCoordinatorJob" frequency="${coord:days(1)}" start="${start}" end="9999-12-31T23:59+0900" timezone="Asia/Seoul"
		xmlns="uri:oozie:coordinator:0.4">
	<controls>
		<timeout>10</timeout>
		<concurrency>1</concurrency>
	</controls>
	<action>
		<workflow>
			<app-path>${workflowDir}</app-path>
		</workflow>
	</action>
</coordinator-app>

3. ディレクトリの構成と配布

  1. 以下のようにディレクトリを構成します。(例)

    hdfs://koya/user/example/myproject/oozie # ユーザーのOozie作業ディレクトリ
    ├── workflow
    │   ├── spark_submit_action.sh # workflow shell actionとして使用されるbashスクリプト
    │   ├── sample-analyzer-1-assembly-0.1.jar # spark-submitで実行するJAR
    │   └── workflow.xml
    ├── coordinator
    │   └── coordinator.xml
    ├── spark2.tar.gz # Spark実行JARアーカイブ
    ├── hadoop_conf.tar.gz # Hadoop Configurationアーカイブ
    └── spark_conf.tar.gz # Spark Configurationアーカイブ
    
  2. Oozieプロジェクトホームとして使用するディレクトリを作成します。

    $ hadoop fs -mkdir -p /user/example/myproject/oozie/workflow
    $ hadoop fs -mkdir -p /user/example/myproject/oozie/coordinator
    
  3. Sparkパッケージをアーカイビングします。

    $ tar cvfz spark2.tar.gz -C /usr/hdp/3.1.0.0-78/ spark2
    
  4. Spark、HadoopのConfigurationをアーカイビングします。デフォルト構成を修正してください。

    # ユーザーは、/etc/hadoop/conf、/etc/spark2/confの下位のファイルを直接修正できません。
    # ファイルをコピーして修正します。
    $ mkdir hadoop_conf
    $ cp -R /etc/hadoop/conf ./hadoop_conf
    $ mkdir spark_conf
    $ cp -R /etc/spark2/conf ./spark_conf
    
  5. ./spark_conf/conf/spark-env.shで以下の行をコメントとして付けます。
    HADOOP_CONF_DIRは分散キャッシュパスである必要があるため、以下の情報が入るとファイルを検索できなくなることがあります。

    # export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/hdp/3.1.0.0-78/hadoop/conf}
    
  6. 修正が完了したらConfigurationファイルもアーカイビングします。

    $ tar cvfz hadoop_conf.tar.gz -C ./hadoop_conf conf
    $ tar cvfz spark_conf.tar.gz -C ./spark_conf conf
    
  7. アーカイビングしたパッケージとワークフローを実行するために必要なファイルをすべてHDFSにアップロードします。

    $ hadoop fs -copyFromLocal -f spark2.tar.gz hadoop_conf.tar.gz \
    spark_conf.tar.gz df.test01.keytab /user/example/myproject/oozie
    
    $ hadoop fs -copyFromLocal -f workflow.xml spark_submit_action.sh sample-analyzer-1-assembly-0.1.jar \
     /user/example/myproject/oozie/workflow
    
     $ hadoop fs -copyFromLocal -f  coordinator.xml  /user/example/myproject/oozie/coordinator
    

4. ワークフローの実行

DevアプリにはOOZIE_URLの環境変数が予め設定されています。

$ oozie job -oozie $OOZIE_URL -config job.properties -Dstart=`TZ="Asia/Seoul" date "+%Y-%m-%dT%H:00"`+"0900" -run

ブラウザで、OOZIE_URLにアクセスして先ほど実行したワークフロー、コーディネーターを確認できます。Resource Managerにアクセスすると、ワークフローに定義したspark-submitのシェルアクションからSparkアプリが実行中であることが確認できます。

df-usercase1_06_vpc_ko

df-usercase1_07_vpc_ko

Sparkアプリが終了すると、[History Server] でそのアプリの実行履歴を照会できます。
df-usercase1_08_vpc_ko

Step 4. Kibanaでデータを確認

Kibanaアプリでecomm_data.order.1d.${日付}形式のインデックスを見つけることができます。

df-usercase1_09_vpc_ko