Documentation Index

Fetch the complete documentation index at: https://guide.ncloud-docs.com/llms.txt

Use this file to discover all available pages before exploring further.

Spark を使用する

Prev Next

最新のコンテンツが反映されていません。早急にアップデート内容をご提供できるよう努めております。最新のコンテンツ内容は韓国語ページをご参照ください。

VPC環境で利用できます。

Data Forestで Spark Jobを提出する方法と、Hive Metastore連携などの追加操作が必要なケースについて説明します。

参考

本ガイドでは Apache Spark 2.3バージョンをベースに説明します。他のバージョンの Sparkを使用する場合、任意のバージョンでの実行環境を構成をご参考ください。

Spark Jobの提出

Data Forestでユーザーは共用 Spark History Serverを使用するか個人用 Spark History Serverを実行できます。

Spark Jobを実行するには、Devアプリを作成してから Spark History Server実行方式を選択します。

$ spark-submit \
 --master yarn \
 --queue longlived \
 ...
 --principal example@KR.DF.NAVERNCP.COM \
 --keytab df.example.keytab \ 
 --class com.naverncp.ExampleApplication \
 example_aplication.jar
注意

一週間以上実行されるジョブを提出する場合、一週間後にタスクが異常終了する可能性があるので、SPARK-23361が適用された Spark 2.4.0以降を使用します。

Spark wordcount

テキストファイルで各単語の数を返す spark_wordcount.pyで、Devシェル(shell)でタスクを提出する例です。
Spark History Serverを参考として Data Forestで作成したヒストリサーバアプリと接続しました。

  1. Devシェルで Kerberos認証を完了します。

    [test01@shell-0.kr.df.naverncp.com ~][df]$ kinit test01 -kt df.test01.keytab
    
  2. spark_wordcount.py プログラムを以下のように作成します。

    import pyspark
    sc = pyspark.SparkContext()
    
    text_file = sc.textFile("input.txt")
    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile("output")
    
  3. タスクを提出する前に、過去に作成された outputがある場合には削除します。

    [test01@shell-0.kr.df.naverncp.com ~][df]$ hdfs dfs -rm -r -f output
    21/04/30 14:49:56 INFO fs.TrashPolicyDefault: Moved: 'hdfs://koya/user/test01/output' to trash at: hdfs://koya/user/test01/.Trash/Current/user/test01/output
    
  4. Input.txtを作成してから HDFSにアップロードします。

    [test01@shell-0.kr.df.naverncp.com ~][df]$ cp $SPARK_HOME/README.md input.txt
    [test01@shell-0.kr.df.naverncp.com ~][df]$ hdfs dfs -put -f input.txt
    
    参考

    spark-submitを実行する場合、--master--deploy-modeを指定しないと提出されません。主にユーザーのアプリケーションは worker machineから物理的に遠いローカルデバイスから提出されるため、driverと executor間のネットワーク遅延が生じます。これを最小限にするために、一般的に cluster mode を使用します。

    [test01@shell-0.kr.df.naverncp.com ~][df]$ spark-submit --py-files spark_wordcount.py \
    --master yarn \
    --deploy-mode cluster \
    spark_wordcount.py
    
  5. ログと一緒にタスクの提出が完了すると、以下のような outputの内容が確認できます。
    df-eco-spark_12_vpc_ko

  6. Spark History Serverにアクセスして作業した内容を確認できます。
    df-eco-spark_11_vpc_ko

Spark SQL

Hiveにあるルールをすべて遵守する必要があります。ただし、Spark SQLではこのルールに合致しなくてもエラーは発生しません。
例えば、ユーザー「example」が spark-sqlで「example_db」という名前で Hiveのルールに反するデータベースを作成する場合、Beelineで照会するとこのデータベースは照会されません。しかし、定期的な監査でルールに反するデータベースは削除しているため、データベースの作成は Beelineで行うことをお勧めします。

Hiveアクセスの設定

Spark SQLで Hiveテーブルにアクセスできます。Sparkで Hive Managed Tableテーブルの読み込み・書き込みを行うには Hive Warehouse Connector(以下、HWC)が必要であるため、テーブルが Managedである場合は追加設定が必要です。

タスク HWC Required LLAP Required
Sparkで Hiveの External Tableを読み込む X X
Sparkでの Hiveの External Tableを書き込む X X
Sparkで Hiveの Managed Tableを読み込む O O
Sparkで Hiveの Managed Tableを書き込む O X
注意
  • Hive LLAP機能は準備中です。
  • Sparkで Hiveの Managed Tableの読み込みは不可能です。

Hiveテーブルにアクセスする際に必要な設定は、以下のとおりです。Hiveテーブルの性質(External/Managed)とタスクの特性を考えて必要なオプションを選択します。

プロパティ
spark.sql.hive.hiveserver2.jdbc.url (Interactive) 準備中
spark.sql.hive.hiveserver2.jdbc.url (Batch) jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
spark.datasource.hive.wareouse.metastoreUri(必須) thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083
spark.datasource.hive.warehouse.load.staging.dir(必須) /user/${USER}/tmp
spark.hadoop.hive.zookeeper.quorum(必須) zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181
spark.hadoop.hive.llap.daemon.service.hosts 準備中
Principal(必須) アカウント Kerberos principal。例) example@KR.DF.NAVERNCP.COM
Keytab(必須) キータブファイルのパス 例)./df.example.keytab
jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar

spark-submitを通じて Batch Hiveにタスクを提出する例は、以下のとおりです。spark-shellPySpark にも同様な設定を追加できます。

$ spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --queue longlived \
    ...
    --conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" \
    --conf spark.datasource.hive.warehouse.metastoreUri="thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083" \
    --conf spark.hadoop.hive.zookeeper.quorum="zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181"
    --principal example@KR.DF.NAVERNCP.COM \
    --keytab df.example.keytab \ 
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar \
    --class com.naverncp.ExampleApplication \
    example_aplication.jar

任意の Sparkバージョンで Sparkの実行環境を構成

Data Forestで提供する Sparkバージョンは2.3.2です。別バージョンの Sparkを使用したい場合に、Hadoop Freeビルドタイプの Sparkをインストールします。

注意
  • Devアプリが構成された状態で行います。
  • 任意の Sparkバージョンを使用する場合、Spark Dynamic Allocation機能は使用できません。

1. Spark(Hadoop Free)ダウンロード

本ガイドでは、2.4.7バージョンをインストールすることを基準に説明します。

Sparkをダウンロードする方法は、以下のとおりです。

  1. Apache Sparkのホームページで適合した Sparkバージョンをダウンロードします。
  2. パッケージタイプは、Pre-bulit with user-provided Apache Hadoop を選択します。
    $ mkdir -p $HOME/apps/spark
    $ wget -P $HOME/apps https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-without-hadoop.tgz
    $ tar xvfz $HOME/apps/spark-2.4.7-bin-without-hadoop.tgz -C $HOME/apps/
    $ ln -s $HOME/apps/spark-2.4.7-bin-without-hadoop $HOME/apps/spark
    $ SPARK_HOME=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop
    $ SPARK_CONF_DIR=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop/conf
    

2. Configurationの設定

  1. まず、以下のデフォルト値の設定をコピーします。
    $ cp /etc/spark2/conf/* $SPARK_CONF_DIR/
    
  2. $SPARK_CONF_DIR/spark-defaults.confファイルに以下のように変数を追加します。
    spark.driver.extraJavaOptions -Dhdp.version=3.1.0.0-78
    spark.yarn.am.extraJavaOptions -Dhdp.version=3.1.0.0-78
    
    spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
    spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
    
  3. $SPARK_CONF_DIR/spark-env.shファイルに以下のように追加します。
    export SPARK_SUBMIT_OPTS="-Dhdp.version=3.1.0.0-78"
    export PATH=$SPARK_HOME/bin:$PATH
    export SPARK_DIST_CLASSPATH=`$HADOOP_COMMON_HOME/bin/hadoop classpath`
    

3. jarsのアップロード

Data Forestで提供するhadoop-*.jarには、多様な種類のバグパッチが適用されています。その他にユーザーに必要な jarも、このパスの下位にコピーしておくと jarアップロードするための追加オプション(extraClassPath)を設定する必要がなくなります。

jarsをアップロードする方法は、以下のとおりです。

  1. $SPARK_HOME/jarの下位にこれらの jarsファイルをコピーします。
    cp /usr/hdp/current/spark2-client/jars/hadoop-*.jar $SPARK_HOME/jars
    
  2. $SPARK_HOME/jarを圧縮します。
    cd $SPARK_HOME/jars
    tar cvfz /tmp/spark_jars.tar.gz *
    
  3. HDFSユーザーホームディレクトリの下位にファイルをアップロードします。
    kinit example@KR.DF.NAVERNCP.COM -kt example.service.keytab
    hadoop fs -copyFromLocal /tmp/spark_jars.tar.gz /user/example/
    hadoop fs -setrep 10 /user/example/spark_jars.tar.gz
    

4. spark-submitの実行

spark-submitを実行する際にspark.yarn.archiveオプションを追加して実行します。以下の例では、上記で作成した spark_wordcount.pyとinput.txt ファイルをそのまま使用します。

$SPARK_HOME/bin/spark-submit --master yarn spark_wordcount.py # 기타 옵션 ... 
--conf spark.yarn.archive=hdfs://koya/user/example/spark_jars.tar.gz \
参考

spark.yarn.archiveオプションを追加して実行しない場合、HDFSにローカルファイルシステムの$SPARK_HOME/jarsをアップロードする過程が必要です。そのため、実行時間が長くかかる場合があります。

Spark 3.0.0以降を使用する場合、使用する HDFSネームスペースを指定するオプションをもう一つ追加してます。

--conf spark.kerberos.access.hadoopFileSystems=hdfs://koya,hdfs://tata