Apache Livy でSpark Jobを送信
- 印刷する
- PDF
Apache Livy でSpark Jobを送信
- 印刷する
- PDF
Article Summary
Share feedback
Thanks for sharing your feedback!
VPC環境で利用できます。
Apache LivyはRESTインターフェースを利用してSparkクラスタと簡単に相互適用できるサービスです。簡単なRESTインターフェースまたはRPC(Remote Procedure Call)クライアントライブラリを通じて、Spark JobまたはSparkコードスニペット、同期/非同期の結果検索、SparkContext管理を簡単に送信できます。
また、 Apache LivyはSparkとアプリケーションサーバ間の相互作用を単純化して、対話型のウェブ/モバイルアプリケーションにSparkを使用できるようにサポートします。
- マルチクライアントで複数のSpark Jobを使用できるようにSparkContextを持っています。
- マルチJob及びクライアントでキャッシュされたRDD(Resilient Distributed Dataset)またはデータフレームを共有します。
- マルチSparkContextを同時に管理でき、優れた耐欠陥性と同時性のためにSparkContextがLivyサーバの代わりにクラスタ(YARN/Mesos)で実行されます。
- Jobは既にコンパイルされたjar、コードスニペットまたはJava/ScalaクライアントAPIを通じて送信できます。
- セキュリティ認証通信を利用してセキュリティを確保します。
参考
- Apache Livyに関する詳細は、Apache Livyの公式サイトをご参照ください。
- 画像の出典:https://livy.incubator.apache.org/assets/images/livy-architecture.png
このガイドでは、Cloud Hadoopが提供するApache Livyを使用してSpark Jobを送信する方法について説明します。
Pythonモジュールのインストール
Sparkのサンプルコードを実行するには、先にrequests
というPythonモジュールをインストールしてください。
$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests
Apache Livyサーバ情報の確認
Apache Livyサーバのポート情報は、Ambari UIで確認できます。
Ambari UIにアクセスし、Spark2 > [CONFIGS] を順にクリックします。
Advanced livy2-conf項目をクリックし、livy.server.port情報を確認します。
Sparkのサンプルコード
サンプルコードは、Apache Livy Examplesを参考にして作成しました。
- ソースコードの内容をlivy-test.pyで保存
#-*- coding:utf-8 -*-
import json, pprint, requests, textwrap, time, sys
# Livy2アクセス情報の入力
if len(sys.argv) < 2:
print('ERROR : Livyサーバのアクセス情報を入力してください')
print(' - Usage: python {0} http://ホスト名:ポート'.format(sys.argv[0]))
sys.exit(1)
host = sys.argv[1]
# ヘッダ情報
headers = {'Content-Type': 'application/json'}
# Sparkセッションの作成
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])
# Sparkセッションの状態確認
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
r = requests.get(session_url, headers=headers)
state = r.json()['state']
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()
# テストコード1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# テストコード2
data = {
'code': textwrap.dedent("""
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
""")
}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))
output = None
while output == None:
r = requests.get(statement_url, headers=headers)
ret = r.json()
if ret['output'] == None:
time.sleep(1)
continue
if 'output' in ret and 'data' in ret['output']:
output = ret['output']['data']['text/plain']
print('-' * 80)
print(output)
# Sparkセッションの終了
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))
サンプルコードのlivy-test.py
の実行時には、以下のようにLivyサーバアクセス情報(http://ip:port)を引数として入力してください。
$ python livy-test.py http://ip:port
使用方法は以下のとおりです。
$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
val x = Math.random();
val y = Math.random();
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47
この記事は役に立ちましたか?