Spark を使用する
    • PDF

    Spark を使用する

    • PDF

    Article Summary

    VPC環境で利用できます。

    Data Forestで Spark Jobを提出する方法と、Hive Metastore連携などの追加操作が必要なケースについて説明します。

    参考

    本ガイドでは Apache Spark 2.3バージョンをベースに説明します。他のバージョンの Sparkを使用する場合、任意のバージョンでの実行環境を構成をご参考ください。

    Spark Jobの提出

    Data Forestでユーザーは共用 Spark History Serverを使用するか個人用 Spark History Serverを実行できます。

    Spark Jobを実行するには、Devアプリを作成してから Spark History Server実行方式を選択します。

    $ spark-submit \
     --master yarn \
     --queue longlived \
     ...
     --principal example@KR.DF.NAVERNCP.COM \
     --keytab df.example.keytab \ 
     --class com.naverncp.ExampleApplication \
     example_aplication.jar
    
    注意

    一週間以上実行されるジョブを提出する場合、一週間後にタスクが異常終了する可能性があるので、SPARK-23361が適用された Spark 2.4.0以降を使用します。

    Spark wordcount

    テキストファイルで各単語の数を返す spark_wordcount.pyで、Devシェル(shell)でタスクを提出する例です。
    Spark History Serverを参考として Data Forestで作成したヒストリサーバアプリと接続しました。

    1. Devシェルで Kerberos認証を完了します。

      [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kinit test01 -kt df.test01.keytab
      
    2. spark_wordcount.py プログラムを以下のように作成します。

      import pyspark
      sc = pyspark.SparkContext()
      
      text_file = sc.textFile("input.txt")
      counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
      counts.saveAsTextFile("output")
      
    3. タスクを提出する前に、過去に作成された outputがある場合には削除します。

      [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -rm -r -f output
      21/04/30 14:49:56 INFO fs.TrashPolicyDefault: Moved: 'hdfs://koya/user/test01/output' to trash at: hdfs://koya/user/test01/.Trash/Current/user/test01/output
      
    4. Input.txtを作成してから HDFSにアップロードします。

      [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cp $SPARK_HOME/README.md input.txt
      [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -put -f input.txt
      
      参考

      spark-submitを実行する場合、--master--deploy-modeを指定しないと提出されません。主にユーザーのアプリケーションは worker machineから物理的に遠いローカルデバイスから提出されるため、driverと executor間のネットワーク遅延が生じます。これを最小限にするために、一般的に cluster mode を使用します。

      [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ spark-submit --py-files spark_wordcount.py \
      --master yarn \
      --deploy-mode cluster \
      spark_wordcount.py
      
    5. ログと一緒にタスクの提出が完了すると、以下のような outputの内容が確認できます。
      df-eco-spark_12_vpc_ko

    6. Spark History Serverにアクセスして作業した内容を確認できます。
      df-eco-spark_11_vpc_ko

    Spark SQL

    Hiveにあるルールをすべて遵守する必要があります。ただし、Spark SQLではこのルールに合致しなくてもエラーは発生しません。
    例えば、ユーザー「example」が spark-sqlで「example_db」という名前で Hiveのルールに反するデータベースを作成する場合、Beelineで照会するとこのデータベースは照会されません。しかし、定期的な監査でルールに反するデータベースは削除しているため、データベースの作成は Beelineで行うことをお勧めします。

    Hiveアクセスの設定

    Spark SQLで Hiveテーブルにアクセスできます。Sparkで Hive Managed Tableテーブルの読み込み・書き込みを行うには Hive Warehouse Connector(以下、HWC)が必要であるため、テーブルが Managedである場合は追加設定が必要です。

    タスクHWC RequiredLLAP Required
    Sparkで Hiveの External Tableを読み込むXX
    Sparkでの Hiveの External Tableを書き込むXX
    Sparkで Hiveの Managed Tableを読み込むOO
    Sparkで Hiveの Managed Tableを書き込むOX
    注意
    • Hive LLAP機能は準備中です。
    • Sparkで Hiveの Managed Tableの読み込みは不可能です。

    Hiveテーブルにアクセスする際に必要な設定は、以下のとおりです。Hiveテーブルの性質(External/Managed)とタスクの特性を考えて必要なオプションを選択します。

    プロパティ
    spark.sql.hive.hiveserver2.jdbc.url (Interactive)準備中
    spark.sql.hive.hiveserver2.jdbc.url (Batch)jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
    spark.datasource.hive.wareouse.metastoreUri(必須)thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083
    spark.datasource.hive.warehouse.load.staging.dir(必須)/user/${USER}/tmp
    spark.hadoop.hive.zookeeper.quorum(必須)zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181
    spark.hadoop.hive.llap.daemon.service.hosts準備中
    Principal(必須)アカウント Kerberos principal。例) example@KR.DF.NAVERNCP.COM
    Keytab(必須)キータブファイルのパス 例)./df.example.keytab
    jars/usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar

    spark-submitを通じて Batch Hiveにタスクを提出する例は、以下のとおりです。spark-shellPySpark にも同様な設定を追加できます。

    $ spark-submit \
        --master yarn \
        --deploy-mode cluster \
        --queue longlived \
        ...
        --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" \
        --conf spark.datasource.hive.warehouse.metastoreUri="thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083" \
        --conf spark.hadoop.hive.zookeeper.quorum="zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181"
        --principal example@KR.DF.NAVERNCP.COM \
        --keytab df.example.keytab \ 
        --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar \
        --class com.naverncp.ExampleApplication \
        example_aplication.jar
    

    任意の Sparkバージョンで Sparkの実行環境を構成

    Data Forestで提供する Sparkバージョンは2.3.2です。別バージョンの Sparkを使用したい場合に、Hadoop Freeビルドタイプの Sparkをインストールします。

    注意
    • Devアプリが構成された状態で行います。
    • 任意の Sparkバージョンを使用する場合、Spark Dynamic Allocation機能は使用できません。

    1. Spark(Hadoop Free)ダウンロード

    本ガイドでは、2.4.7バージョンをインストールすることを基準に説明します。

    Sparkをダウンロードする方法は、以下のとおりです。

    1. Apache Sparkのホームページで適合した Sparkバージョンをダウンロードします。
    2. パッケージタイプは、Pre-bulit with user-provided Apache Hadoop を選択します。
      $ mkdir -p $HOME/apps/spark
      $ wget -P $HOME/apps https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-without-hadoop.tgz
      $ tar xvfz $HOME/apps/spark-2.4.7-bin-without-hadoop.tgz -C $HOME/apps/
      $ ln -s $HOME/apps/spark-2.4.7-bin-without-hadoop $HOME/apps/spark
      $ SPARK_HOME=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop
      $ SPARK_CONF_DIR=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop/conf
      

    2. Configurationの設定

    1. まず、以下のデフォルト値の設定をコピーします。
      $ cp /etc/spark2/conf/* $SPARK_CONF_DIR/
      
    2. $SPARK_CONF_DIR/spark-defaults.confファイルに以下のように変数を追加します。
      spark.driver.extraJavaOptions -Dhdp.version=3.1.0.0-78
      spark.yarn.am.extraJavaOptions -Dhdp.version=3.1.0.0-78
      
      spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
      spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
      
    3. $SPARK_CONF_DIR/spark-env.shファイルに以下のように追加します。
      export SPARK_SUBMIT_OPTS="-Dhdp.version=3.1.0.0-78"
      export PATH=$SPARK_HOME/bin:$PATH
      export SPARK_DIST_CLASSPATH=`$HADOOP_COMMON_HOME/bin/hadoop classpath`
      

    3. jarsのアップロード

    Data Forestで提供するhadoop-*.jarには、多様な種類のバグパッチが適用されています。その他にユーザーに必要な jarも、このパスの下位にコピーしておくと jarアップロードするための追加オプション(extraClassPath)を設定する必要がなくなります。

    jarsをアップロードする方法は、以下のとおりです。

    1. $SPARK_HOME/jarの下位にこれらの jarsファイルをコピーします。
      cp /usr/hdp/current/spark2-client/jars/hadoop-*.jar $SPARK_HOME/jars
      
    2. $SPARK_HOME/jarを圧縮します。
      cd $SPARK_HOME/jars
      tar cvfz /tmp/spark_jars.tar.gz *
      
    3. HDFSユーザーホームディレクトリの下位にファイルをアップロードします。
      kinit example@KR.DF.NAVERNCP.COM -kt example.service.keytab
      hadoop fs -copyFromLocal /tmp/spark_jars.tar.gz /user/example/
      hadoop fs -setrep 10 /user/example/spark_jars.tar.gz
      

    4. spark-submitの実行

    spark-submitを実行する際にspark.yarn.archiveオプションを追加して実行します。以下の例では、上記で作成した spark_wordcount.pyとinput.txt ファイルをそのまま使用します。

    $SPARK_HOME/bin/spark-submit --master yarn spark_wordcount.py # 기타 옵션 ... 
    --conf spark.yarn.archive=hdfs://koya/user/example/spark_jars.tar.gz \
    
    参考

    spark.yarn.archiveオプションを追加して実行しない場合、HDFSにローカルファイルシステムの$SPARK_HOME/jarsをアップロードする過程が必要です。そのため、実行時間が長くかかる場合があります。

    Spark 3.0.0以降を使用する場合、使用する HDFSネームスペースを指定するオプションをもう一つ追加してます。

    --conf spark.kerberos.access.hadoopFileSystems=hdfs://koya,hdfs://tata
    

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.