Submitting Spark jobs using Apache Livy

Prev Next

Available in Classic

Apache Livy is a service that allows you to easily interact with a Spark cluster using a REST interface. Through a simple REST interface or RPC client library, you can easily submit Spark jobs or Spark code snippets, synchronous/asynchronous result searches, or SparkContext management.

In addition, Apache Livy helps you use Spark for interactive web/mobile applications by simplifying the interaction between Spark and the application server.

  • It has a SparkContext so that multiple Spark jobs can be used by multiple clients.
  • Share cached RDD or data frames across multiple jobs and clients.
  • Multiple Spark contexts can be managed simultaneously, and SparkContexts run on a cluster (YARN/Mesos) instead of Livy server for superior fault tolerance and concurrency.
  • Jobs can be submitted through precompiled jar files, code snippets, or the Java/Scala client APIs.
  • Ensure security through the use of security authentication communication.

hadoop-chadoop-use-ex9_1-1_en

Note

This guide explains how to submit a Spark Job using Apache Livy provided by Cloud Hadoop. When creating a Cloud Hadoop cluster, select Cluster Type as Spark.

Install Python module

Install a Python module called requests to run the Spark example code.

$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests

View Apache Livy server information

The port information of Apache Livy servers can be viewed on the Ambari UI.

  1. Access Ambari UI, and then click Spark2 > [Configs], in that order.
    hadoop-chadoop-use-ex9_2-1_C

  2. Click the Advanced livy2-conf item, and then check the livy.server.port information.
    hadoop-chadoop-use-ex9_2-2_C

Spark example code

The example code was written referring to Apache Livy examples.

  • Save the source code content as livy-test.py
#-*- coding:utf-8 -*-

import json, pprint, requests, textwrap, time, sys

# Enter Livy2 access information
if len(sys.argv) < 2:
        print('ERROR : Please enter Livy server access information')
        print(' - Usage: python {0} http://host name:port'.format(sys.argv[0]))
        sys.exit(1)
host = sys.argv[1]

# Header information
headers = {'Content-Type': 'application/json'}

# Create Spark session
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])

# Check Spark session status
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
        r = requests.get(session_url, headers=headers)
        state = r.json()['state']
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()


# Test code 1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# Test code 2
data = {
        'code': textwrap.dedent("""
                val NUM_SAMPLES = 100000;
                val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                        val x = Math.random();
                        val y = Math.random();
                        if (x*x + y*y < 1) 1 else 0
                }.reduce(_ + _);
                println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# End Spark session
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))

When executing the example code, livy-test.py, enter the Livy server access information (http://ip:port) as an argument value.

$ python livy-test.py http://ip:port

You can use it as below.

$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
        val x = Math.random();
        val y = Math.random();
        if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47