Using Oozie
    • PDF

    Using Oozie

    • PDF

    Article Summary

    Available in VPC

    Oozie is a workflow scheduler system which manages Apache Hadoop's MapReduce jobs' workflow. It helps manage frequent MapReduce jobs and run periodic jobs easily. This guide explains examples of writing a workflow using Oozie.

    Note

    Refer to official Oozie document for more information about Oozie.

    OozieCLI

    Oozie CLI is a client provided by Apache Oozie. Oozie CLI is a client that has some customized features. It is returned immediately after a job submission and does not wait for the job to finish.

    ItemOozieCLI
    Execution script name/bin/oozie
    Options supportedVarious options are available such as job, jobs, admin, validate, etc.
    commandCommands are used in the format of Oozie job <OPTIONS>.
    job operationOperations such as run, rerun, submit, suspend, etc., are available.

    Workflow

    Workflows are defined with XML-based hadoop Process Definition Language (hPDL). Since a workflow definition is a Direct Acyclic Graph (DAG) made up of flow control nodes and action nodes, it does not provide directed cyclic graphs.

    A workflow includes flow control nodes and action nodes.

    • Flow control node: It defines the beginning and the end of a workflow (start, end, and fail nodes), and provides a mechanism to control the workflow's direction of execution (decision, fork, and join nodes). A workflow can use parameters like ${inputDir}. When a job is submitted, the parameter value of the workflow must be provided, written in a job properties file.
    • Action node: It enables a workflow to begin execution of calculation and processing tasks, and provides a variety of actions such as Hadoop MapReduce, file system, SSH, email, and sub-workflow.

    Write workflow

    Workflows are written in workflow.xml.

    DAG defined with a workflow works as follows. Define a workflow by referring to the following image.

    df-eco-oozie_01_vpc_ko

    Specify the names of the start, end, and kill nodes with a flow control node.

    # Specify [ACTION-NODE-NAME], [NEXT-NODE-NAME], [END-NODE-NAME], and [ERROR-NODE-NAME] according to the task you want.
    
    <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
      <start to="[ACTION-NODE-NAME]"/>
    
      <action name="[ACTION-NODE-NAME]">
        ...
        <ok to="[END-NODE-NAME]">
        <error to="[ERROR-NODE-NAME]">
      </action>
    
      ...
    
      <kill name="[ERROR-NODE-NAME]">
        <message>[MESSAGE-TO-LOG]</message>
      </kill>
    
      <end name="[END-NODE-NAME]"/>
    </workflow-app>
    
    • start node
      It specifies the first action to start when a workflow is run. The action runs in the order specified in the start node, not the order of definition. Refer to Write action node for how to write action nodes.
    • ok node
      It specifies the name of the action, and specifies the node to run when the action is finished normally. The example above shows [END-NODE-NAME], but you can also specify the second action node name to run.
    • error node
      It specifies the node to run when the action is finished normally. Write the kill node or the action node to run next, in the same manner as the ok node.
    • kill node
      It can end the workflow job by itself. The job is killed by an error (KILLED). message's content will be logged as the kill reason for the workflow job.
    • end node
      It is the end for a workflow job, and indicates that the job has been completed successfully (SUCCEEDED).
    Note

    For the flow control nodes other than nodes explained, refer to Decision and Fork and join.

    Write action node

    Define an action node.
    The example of MapReduce action is as follows.

      <workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
      ...
        <action name="[ACTION-NODE-NAME]">
            <map-reduce>
                <job-tracker>[ResourceManagerHostAddress:Port]</job-tracker>
                <name-node>[NameNodeHostAddress:Port]</name-node>
    
                <prepare>
                  <delete path="[OUTPUT_DIR]"/>
                </prepare>
    
                <!-- Specify a streaming definition only when it is a streaming job. -->
                <streaming>
                  <mapper>[MAPPER]</mapper>
                  <reducer>[REDUCER]</reducer>
                </streaming>
    
                <configuration>
                    <property>
                        <name>mapred.mapper.new-api</name>
                        <value>true</value>
                    </property>
                    <property>
                        <name>mapred.reducer.new-api</name>
                        <value>true</value>
                    </property>
                    <property>
                        <name>[PROPERTY-NAME]</name>
                        <value>[PROPERTY-VALUE]</value>
                    </property>
                    ...
                </configuration>
                ...
                <!-- In the case distributed cache is used -->
                <!-- If the path is relative, then it is based on the application path in job properties -->
                <file>[FILE-PATH]</file>
                <archive>[FILE-PATH]</archive>
                ...
            </map-reduce>
            <ok to="[END-NODE-NAME]"/>
            <error to="[ERROR-NODE-NAME]"/>
        </action>
        ...
        </workflow-app>
    

    The following describes how to define each action node in the MapReduce version 2 (hereinafter referred to as MRv2).

    • job-tracker
      Specify as the ResourceManager's address in MRv2. Write ResourceManagerHostAddress:Port. Since Data Forest grants ID to ResourceManager, it uses either rm1 or rm2.
    • name-node
      Write a name node address. Since Data Forest provides two HDFS namespaces, designate the namespace to use between hdfs://koya and hdfs://tata.
    • prepare
      Write the job that runs before the job starts. Write a job such as Delete output directory.
    • configuration
      Write the settings for the job after configuration. For MRv2, mapred.[mapper|reducer].new-api must be set to true. Write all the settings required for other jobs.
    • file / archive
      When deploying the files with distributed cache, add file or archive definitions.
    Note

    The content of the workflow settings can be defined with parameters such as ${NAME}. If the settings have to be changed for each job execution, then defining them with parameters enables executions in multiple environments without editing the workflow.

    Workflow example

    After defining flow control node and action node, write a wordcount program written in Java and Python in a workflow.

    MapReduce example using Java

    <workflow-app name="WordCountWithJava" xmlns="uri:oozie:workflow:0.5">
    <start to="wordcount"/>
    <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}" />
            </prepare>
            <configuration>
                <property>
                    <name>mapred.mapper.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapred.reducer.new-api</name>
                    <value>true</value>
                </property>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${outputDir}</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>${reduceNum}</value>
                </property>
                <property>
                    <name>mapreduce.task.timeout</name>
                    <value>0</value>
                </property>
                <property>
                    <name>mapreduce.job.map.class</name>
                    <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
                </property>
                <property>
                    <name>mapreduce.job.reduce.class</name>
                    <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.inputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat</value>
                </property>
                <property>
                    <name>mapreduce.job.outputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action> <!--}}}-->
    
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
    </workflow-app>
    

    MapReduce example using streaming

    <workflow-app name="WordCountStreaming" xmlns="uri:oozie:workflow:0.5">
        <start to="wordcount"/>
        <action name="wordcount">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${outputDir}" />
            </prepare>
            <streaming>
                    <mapper>python mapper.py</mapper>
                    <reducer>python reducer.py</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapreduce.job.queuename</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>${outputDir}</value>
                </property>
                <property>
                    <name>mapreduce.job.reduces</name>
                    <value>${reduceNum}</value>
                </property>
                <property>
                    <name>mapreduce.task.timeout</name>
                    <value>0</value>
                </property>
            </configuration>
            <file>mapper.py</file>
            <file>reducer.py</file>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action> <!--}}}-->
    
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
    </workflow-app>
    

    Deploy workflow

    In order to actually execute jobs after writing workflows, the workflow and library required for running should be deployed on HDFS.
    Upload the written workflow to HDFS's job directory, and place the library required for running jobs (*.jar, *.so) in the lib directory.

    ├── workflow.xml
    └── lib
        └── hadoop-mapreduce-examples-2.7.1.jar
    
    Note

    The JAR files contained in the lib directory are automatically added to the classpath. When additionally needed JAR files or native libraries are specified in the file element, they don't necessarily have to be placed in the lib directory.

    Streaming job

    The following describes how to run a streaming job.

    1. Upload hadoop-streaming-<VERSION>.jar to the lib directory.
    2. Upload the scripts or programs specified in the mapper and reducer to the HDFS path.
    ├── workflow.xml
    ├── mapper.py
    ├── reducer.py
    └── lib
        └── hadoop-streaming-2.7.1.jar
    

    Run workflow

    Job properties must be written before running workflows deployed on HDFS.

    Write job properties

    Write a job properties file where the workflow's parameters are set.

    # Hadoop configuration
    nameNode=hdfs://koya
    jobTracker=rm1
    
    # Oozie configuration
    oozie.wf.application.path=${serviceDir}/workflow
    
    # job directory information
    serviceDir=${nameNode}/user/test01/wordcount
    inputDir=${serviceDir}/input
    outputDir=${serviceDir}/output
    
    # user information
    user.name=test01
    
    # job configuration
    queueName=default
    reduceNum=2
    
    # For name node and job tracker, put in the information of the cluster to run the job.
    # For oozie.wf.application.path, write the location where the workflow of the job to run is deployed.
    

    Deploy job properties

    The completed job properties file does not need to be deployed to HDFS or other systems. Place it on the device that will execute the Oozie command.

    Run job properties

    Execute the Oozie job after the workflow deployment and job properties file composition is completed.

    oozie job -oozie <oozie server> -config <job properties> <option>
    
    $ oozie job -oozie http://oozie.kr.df.naverncp.com:11000/oozie -config job.properties -run -doas test01
    

    Was this article helpful?

    What's Next
    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.