Available in VPC
Apache Livy is a service that makes it easy to interact with Spark clusters using a REST interface. Easily submit Spark Jobs or Spark code snippets, retrieve synchronous/asynchronous results, and manage SparkContexts through a simple REST interface or Remote Procedure Call (RPC) client library.
Apache Livy also simplifies the interaction between Spark and application servers, enabling you to use Spark in interactive web/mobile applications.
- We have a SparkContext to use multiple Spark Jobs on multiple clients.
- Share cached Resilient Distributed Datasets (RDDs) or data frames across multiple jobs and clients.
- Multiple SparkContexts can be managed at the same time. SparkContext runs on a cluster (YARN/Mesos) instead of a Livy server for better fault tolerance and concurrency.
- Jobs can be submitted via precompiled jars, code snippets, or the Java/Scala client API.
- Ensure security using security-authenticated communication.

- For more information about Apache Livy, see [Apache Livy website]{target="_blank"}(https://livy.incubator.apache.org/).
- Image source: https://livy.incubator.apache.org/assets/images/livy-architecture
This guide describes submitting a Spark Job using Apache Livy provided by Cloud Hadoop.
Installing Python modules
Firstly, install a Python module called requests to perform the Spark example code.
$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests
You can install requests with the yum command.
$ sudo yum install -y python-requests
Check Apache Livy server information
The port information for the Apache Livy server can be found in the Ambari UI.
-
Access the Ambari UI, then click Spark2 > [CONFIGS].

-
Click the Advanced livy2-conf item and check the livy.server.port information.

Spark example code
The example code was written with reference to Apache Livy Examples.
- Save the source code as livy-test.py
#-*- coding:utf-8 -*-
import json, pprint, requests, textwrap, time, sys
# Enter Livy2 access information
if len(sys.argv) < 2:
print('ERROR : Please enter the Livy server connection information')
print(' - Usage: python {0} http://Hostname:Port'.format(sys.argv[0]))
sys.exit(1)
host = sys.argv[1]
# Header information
headers = {'Content-Type': 'application/json'}
# Create a Spark session
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])
# Check Spark session status
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
r = requests.get(session_url, headers=headers)
state = r.json()['state']
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()
# Test Code 1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# Test Code 2
data = {
'code': textwrap.dedent("""
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
""")
}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# End a Spark session
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))
When running the example code, livy-test.py, you must enter the Livy server connection information (http://ip:port) as an argument value, as shown below.
$ python livy-test.py http://{Host IP where Livy Server is installed}:8999
Here's how to use it.
$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47