Submitting Spark jobs using Apache Livy
    • PDF

    Submitting Spark jobs using Apache Livy

    • PDF

    Article Summary

    Available in VPC

    Apache Livy is a service that allows you to easily interact with a Spark cluster using a REST interface. Through a simple REST interface or RPC(Remote Procedure Call) client library, you can easily submit Spark jobs or Spark code snippets, synchronous/asynchronous result searches, or SparkContext management.

    In addition, Apache Livy helps you use Spark for interactive web/mobile applications by simplifying the interaction between Spark and the application server.

    • It has a SparkContext so that multiple Spark jobs can be used by multiple clients.
    • Share cached RDD(Resilient Distributed Dataset) or data frames across multiple jobs and clients.
    • Multiple SparkContexts can be managed simultaneously, and SparkContexts run on a cluster (YARN/Mesos) instead of Livy server for superior fault tolerance and concurrency.
    • Jobs can be submitted through precompiled jar files, code snippets, or the Java/Scala client APIs.
    • Ensure security through the use of security authentication communication.

    hadoop-chadoop-use-ex9_1-1

    Note

    This guide explains how to submit a Spark Job using Apache Livy provided by Cloud Hadoop.

    Install Python module

    Install a Python module called requests to run the Spark example code.

    $ sudo yum install -y epel-release
    $ sudo yum install -y python-pip
    $ sudo pip install requests
    

    View Apache Livy server information

    The port information of Apache Livy servers can be viewed on the Ambari UI.

    1. Access Ambari UI, and then click Spark 2 > [CONFIGS], in that order.
      hadoop-chadoop-use-ex9_2-1_en

    2. Click the Advanced livy2-conf item, and then check the livy.server.port information.
      hadoop-chadoop-use-ex9_2-2_en

    Spark example code

    The example code was written referring to Apache Livy examples.

    • Save the source code content as livy-test.py
    #-*- coding:utf-8 -*-
    
    import json, pprint, requests, textwrap, time, sys
    
    # Enter Livy2 access information
    if len(sys.argv) < 2:
            print('ERROR : Please enter Livy server access information')
            print(' - Usage: python {0} http://host name:port'.format(sys.argv[0]))
            sys.exit(1)
    host = sys.argv[1]
    
    # Header information
    headers = {'Content-Type': 'application/json'}
    
    # Create Spark session
    data = {'kind': 'spark'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    print("Created " + r.headers['location'])
    
    # Check Spark session status
    state = "notIdle"
    session_url = host + r.headers['location']
    sys.stdout.write('Waiting for session state to idle')
    while state != 'idle':
            r = requests.get(session_url, headers=headers)
            state = r.json()['state']
            sys.stdout.write('.')
            sys.stdout.flush()
            time.sleep(1)
    sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
    sys.stdout.flush()
    
    
    # Test code 1
    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # Test code 2
    data = {
            'code': textwrap.dedent("""
                    val NUM_SAMPLES = 100000;
                    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                            val x = Math.random();
                            val y = Math.random();
                            if (x*x + y*y < 1) 1 else 0
                    }.reduce(_ + _);
                    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                    """)
    }
    
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # End Spark session
    print('=' * 80)
    r = requests.delete(session_url, headers=headers)
    print('{0} {1}'.format(r.json()['msg'], session_url))
    

    When executing the example code, livy-test.py, enter the Livy server access information (http://ip:port) as an argument value.

    $ python livy-test.py http://ip:port
    

    You can use it as below.

    $ python livy-test.py http://172.16.3.22:8999
    Created /sessions/47
    Sessioin State is Ready!!!!!!!!!!!!!!...........................
    ================================================================================
    http://172.16.3.22:8999/sessions/47/statements/0
    Request: 1 + 1
    --------------------------------------------------------------------------------
    res0: Int = 2================================================================================
    http://172.16.3.22:8999/sessions/47/statements/1
    Request:
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
            val x = Math.random();
            val y = Math.random();
            if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
    NUM_SAMPLES: Int = 100000
    count: Int = 78503
    Pi is roughly 3.14012================================================================================
    deleted http://172.16.3.22:8999/sessions/47
    

    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.