Apache Livyで Spark Jobを提出

Prev Next

VPC環境で利用できます。

Apache Livyとは、RESTインターフェースを利用して Sparkクラスタと簡単に相互作用できるサービスです。簡単な RESTインターフェースまたは Remote Procedure Call(RPC)クライアントライブラリを通じて、Spark Jobまたは Sparkコードスニペット、同期/非同期の結果検索、SparkContext管理を楽に提出できます。

また、Apache Livyは Sparkとアプリケーションサーバ間の相互作用を単純化し、対話型ウェブ/モバイルアプリケーションで Sparkを使用できるようにサポートします。

  • マルチクライアントで複数の Spark Jobを使用できるように、SparkContextを持っています。
  • マルチ Jobとクライアントでキャッシュされた Resilient Distributed Dataset(RDD)またはデータフレームを共有します。
  • マルチ SparkContextを同時に管理できます。優れたフォールトトレランスと同時性のため、SparkContextが Livyサーバの代わりにクラスタ(YARN/Mesos)で実行されます。
  • コンパイル済みの jar、コードスニペットまたは Java/Scalaクライアント APIを通じて Jobを提出できます。
  • セキュリティ認証通信を利用してセキュリティを確保します。

hadoop-chadoop-use-ex9_1-1

参考

このガイドでは、Cloud Hadoopで提供する Apache Livyを使用して Spark Jobを提出する方法を説明します。

Pythonモジュールをインストール

Sparkサンプルコードを実行するには、まず requestsという Pythonモジュールをインストールします。

$ sudo yum install -y epel-release
$ sudo yum install -y python-pip
$ sudo pip install requests
参考

yumコマンドを使用して requestsをインストールできます。
$ sudo yum install -y python-requests

Apache Livyサーバ情報確認

Apache Livyサーバのポート情報は、Ambari UIで確認できます。

  1. Ambari UIにアクセスした後、 Spark2 > [CONFIGS] を順にクリックします。
    hadoop-chadoop-use-ex9_2-1_ko

  2. Advanced livy2-conf 項目をクリックした後、livy.server.port 情報を確認します。
    hadoop-chadoop-use-ex9_2-2_ko

Sparkサンプルコード

サンプルコードは Apache Livy Examplesを参照して作成しました。

  • ソースコードの内容を livy-test.pyで保存
#-*- coding:utf-8 -*-

import json, pprint, requests, textwrap, time, sys

# Livy2アクセス情報を入力
if len(sys.argv) < 2:
        print('ERROR : Livyサーバアクセス情報を入力してください')
        print(' - Usage: python {0} http://ホスト名:ポート'.format(sys.argv[0]))
        sys.exit(1)
host = sys.argv[1]

# ヘッダ情報
headers = {'Content-Type': 'application/json'}

# Sparkセッション作成
data = {'kind': 'spark'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
print("Created " + r.headers['location'])

# Sparkセッションステータスを確認
state = "notIdle"
session_url = host + r.headers['location']
sys.stdout.write('Waiting for session state to idle')
while state != 'idle':
        r = requests.get(session_url, headers=headers)
        state = r.json()['state']
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1)
sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
sys.stdout.flush()


# テストコード1
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# テストコード2
data = {
        'code': textwrap.dedent("""
                val NUM_SAMPLES = 100000;
                val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                        val x = Math.random();
                        val y = Math.random();
                        if (x*x + y*y < 1) 1 else 0
                }.reduce(_ + _);
                println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                """)
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
print('=' * 80)
print(statement_url)
print('Request: {0}'.format(data['code']))

output = None
while output == None:
        r = requests.get(statement_url, headers=headers)
        ret = r.json()
        if ret['output'] == None:
                time.sleep(1)
                continue
        if 'output' in ret and 'data' in ret['output']:
                output = ret['output']['data']['text/plain']

print('-' * 80)
print(output)

# Sparkセッション終了
print('=' * 80)
r = requests.delete(session_url, headers=headers)
print('{0} {1}'.format(r.json()['msg'], session_url))

サンプルコードの livy-test.py 実行時に以下のように Livyサーバアクセス情報(http://ip:port)を引数値として入力する必要があります。

$ python livy-test.py http://{Livy Serverがインストールされた Host IP}:8999

使用方法は次の通りです。

$ python livy-test.py http://172.16.3.22:8999
Created /sessions/47
Sessioin State is Ready!!!!!!!!!!!!!!...........................
================================================================================
http://172.16.3.22:8999/sessions/47/statements/0
Request: 1 + 1
--------------------------------------------------------------------------------
res0: Int = 2================================================================================
http://172.16.3.22:8999/sessions/47/statements/1
Request:
val NUM_SAMPLES = 100000;
val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
        val x = Math.random();
        val y = Math.random();
        if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _);
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
NUM_SAMPLES: Int = 100000
count: Int = 78503
Pi is roughly 3.14012================================================================================
deleted http://172.16.3.22:8999/sessions/47