Apache Livy でSpark Jobを送信
    • PDF

    Apache Livy でSpark Jobを送信

    • PDF

    Article Summary

    VPC環境で利用できます。

    Apache LivyはRESTインターフェースを利用してSparkクラスタと簡単に相互適用できるサービスです。簡単なRESTインターフェースまたはRPC(Remote Procedure Call)クライアントライブラリを通じて、Spark JobまたはSparkコードスニペット、同期/非同期の結果検索、SparkContext管理を簡単に送信できます。

    また、 Apache LivyはSparkとアプリケーションサーバ間の相互作用を単純化して、対話型のウェブ/モバイルアプリケーションにSparkを使用できるようにサポートします。

    • マルチクライアントで複数のSpark Jobを使用できるようにSparkContextを持っています。
    • マルチJob及びクライアントでキャッシュされたRDD(Resilient Distributed Dataset)またはデータフレームを共有します。
    • マルチSparkContextを同時に管理でき、優れた耐欠陥性と同時性のためにSparkContextがLivyサーバの代わりにクラスタ(YARN/Mesos)で実行されます。
    • Jobは既にコンパイルされたjar、コードスニペットまたはJava/ScalaクライアントAPIを通じて送信できます。
    • セキュリティ認証通信を利用してセキュリティを確保します。

    hadoop-chadoop-use-ex9_1-1

    参考

    このガイドでは、Cloud Hadoopが提供するApache Livyを使用してSpark Jobを送信する方法について説明します。

    Pythonモジュールのインストール

    Sparkのサンプルコードを実行するには、先にrequestsというPythonモジュールをインストールしてください。

    $ sudo yum install -y epel-release
    $ sudo yum install -y python-pip
    $ sudo pip install requests
    

    Apache Livyサーバ情報の確認

    Apache Livyサーバのポート情報は、Ambari UIで確認できます。

    1. Ambari UIにアクセスし、Spark2 > [CONFIGS] を順にクリックします。
      hadoop-chadoop-use-ex9_2-1_ja

    2. Advanced livy2-conf項目をクリックし、livy.server.port情報を確認します。
      hadoop-chadoop-use-ex9_2-2_ja

    Sparkのサンプルコード

    サンプルコードは、Apache Livy Examplesを参考にして作成しました。

    • ソースコードの内容をlivy-test.pyで保存
    #-*- coding:utf-8 -*-
    
    import json, pprint, requests, textwrap, time, sys
    
    # Livy2アクセス情報の入力
    if len(sys.argv) < 2:
            print('ERROR : Livyサーバのアクセス情報を入力してください')
            print(' - Usage: python {0} http://ホスト名:ポート'.format(sys.argv[0]))
            sys.exit(1)
    host = sys.argv[1]
    
    # ヘッダ情報
    headers = {'Content-Type': 'application/json'}
    
    # Sparkセッションの作成
    data = {'kind': 'spark'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    print("Created " + r.headers['location'])
    
    # Sparkセッションの状態確認
    state = "notIdle"
    session_url = host + r.headers['location']
    sys.stdout.write('Waiting for session state to idle')
    while state != 'idle':
            r = requests.get(session_url, headers=headers)
            state = r.json()['state']
            sys.stdout.write('.')
            sys.stdout.flush()
            time.sleep(1)
    sys.stdout.write('\rSessioin State is Ready!!!!!!!!!!!!!!\n')
    sys.stdout.flush()
    
    
    # テストコード1
    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # テストコード2
    data = {
            'code': textwrap.dedent("""
                    val NUM_SAMPLES = 100000;
                    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
                            val x = Math.random();
                            val y = Math.random();
                            if (x*x + y*y < 1) 1 else 0
                    }.reduce(_ + _);
                    println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
                    """)
    }
    
    r = requests.post(statements_url, data=json.dumps(data), headers=headers)
    statement_url = host + r.headers['location']
    print('=' * 80)
    print(statement_url)
    print('Request: {0}'.format(data['code']))
    
    output = None
    while output == None:
            r = requests.get(statement_url, headers=headers)
            ret = r.json()
            if ret['output'] == None:
                    time.sleep(1)
                    continue
            if 'output' in ret and 'data' in ret['output']:
                    output = ret['output']['data']['text/plain']
    
    print('-' * 80)
    print(output)
    
    # Sparkセッションの終了
    print('=' * 80)
    r = requests.delete(session_url, headers=headers)
    print('{0} {1}'.format(r.json()['msg'], session_url))
    

    サンプルコードのlivy-test.pyの実行時には、以下のようにLivyサーバアクセス情報(http://ip:port)を引数として入力してください。

    $ python livy-test.py http://ip:port
    

    使用方法は以下のとおりです。

    $ python livy-test.py http://172.16.3.22:8999
    Created /sessions/47
    Sessioin State is Ready!!!!!!!!!!!!!!...........................
    ================================================================================
    http://172.16.3.22:8999/sessions/47/statements/0
    Request: 1 + 1
    --------------------------------------------------------------------------------
    res0: Int = 2================================================================================
    http://172.16.3.22:8999/sessions/47/statements/1
    Request:
    val NUM_SAMPLES = 100000;
    val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
            val x = Math.random();
            val y = Math.random();
            if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _);
    println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)--------------------------------------------------------------------------------
    NUM_SAMPLES: Int = 100000
    count: Int = 78503
    Pi is roughly 3.14012================================================================================
    deleted http://172.16.3.22:8999/sessions/47
    

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.