VPC環境で利用できます。
Data Forestで Spark Jobを提出する方法と、Hive Metastore連携などの追加操作が必要なケースについて説明します。
本ガイドでは Apache Spark 2.3バージョンをベースに説明します。他のバージョンの Sparkを使用する場合、任意のバージョンでの実行環境を構成をご参考ください。
Spark Jobの提出
Data Forestでユーザーは共用 Spark History Serverを使用するか個人用 Spark History Serverを実行できます。
Spark Jobを実行するには、Devアプリを作成してから Spark History Server実行方式を選択します。
$ spark-submit \
--master yarn \
--queue longlived \
...
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--class com.naverncp.ExampleApplication \
example_aplication.jar
一週間以上実行されるジョブを提出する場合、一週間後にタスクが異常終了する可能性があるので、SPARK-23361が適用された Spark 2.4.0以降を使用します。
Spark wordcount
テキストファイルで各単語の数を返す spark_wordcount.pyで、Devシェル(shell)でタスクを提出する例です。
Spark History Serverを参考として Data Forestで作成したヒストリサーバアプリと接続しました。
-
Devシェルで Kerberos認証を完了します。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kinit test01 -kt df.test01.keytab -
spark_wordcount.py プログラムを以下のように作成します。
import pyspark sc = pyspark.SparkContext() text_file = sc.textFile("input.txt") counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("output") -
タスクを提出する前に、過去に作成された outputがある場合には削除します。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -rm -r -f output 21/04/30 14:49:56 INFO fs.TrashPolicyDefault: Moved: 'hdfs://koya/user/test01/output' to trash at: hdfs://koya/user/test01/.Trash/Current/user/test01/output -
Input.txtを作成してから HDFSにアップロードします。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cp $SPARK_HOME/README.md input.txt [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -put -f input.txt参考spark-submitを実行する場合、
--master、--deploy-modeを指定しないと提出されません。主にユーザーのアプリケーションは worker machineから物理的に遠いローカルデバイスから提出されるため、driverと executor間のネットワーク遅延が生じます。これを最小限にするために、一般的に cluster mode を使用します。[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ spark-submit --py-files spark_wordcount.py \ --master yarn \ --deploy-mode cluster \ spark_wordcount.py -
ログと一緒にタスクの提出が完了すると、以下のような outputの内容が確認できます。

-
Spark History Serverにアクセスして作業した内容を確認できます。

Spark SQL
Hiveにあるルールをすべて遵守する必要があります。ただし、Spark SQLではこのルールに合致しなくてもエラーは発生しません。
例えば、ユーザー「example」が spark-sqlで「example_db」という名前で Hiveのルールに反するデータベースを作成する場合、Beelineで照会するとこのデータベースは照会されません。しかし、定期的な監査でルールに反するデータベースは削除しているため、データベースの作成は Beelineで行うことをお勧めします。
Hiveアクセスの設定
Spark SQLで Hiveテーブルにアクセスできます。Sparkで Hive Managed Tableテーブルの読み込み・書き込みを行うには Hive Warehouse Connector(以下、HWC)が必要であるため、テーブルが Managedである場合は追加設定が必要です。
| タスク | HWC Required | LLAP Required |
|---|---|---|
| Sparkで Hiveの External Tableを読み込む | X | X |
| Sparkでの Hiveの External Tableを書き込む | X | X |
| Sparkで Hiveの Managed Tableを読み込む | O | O |
| Sparkで Hiveの Managed Tableを書き込む | O | X |
- Hive LLAP機能は準備中です。
- Sparkで Hiveの Managed Tableの読み込みは不可能です。
Hiveテーブルにアクセスする際に必要な設定は、以下のとおりです。Hiveテーブルの性質(External/Managed)とタスクの特性を考えて必要なオプションを選択します。
| プロパティ | 値 |
|---|---|
| spark.sql.hive.hiveserver2.jdbc.url (Interactive) | 準備中 |
| spark.sql.hive.hiveserver2.jdbc.url (Batch) | jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 |
| spark.datasource.hive.wareouse.metastoreUri(必須) | thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083 |
| spark.datasource.hive.warehouse.load.staging.dir(必須) | /user/${USER}/tmp |
| spark.hadoop.hive.zookeeper.quorum(必須) | zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181 |
| spark.hadoop.hive.llap.daemon.service.hosts | 準備中 |
| Principal(必須) | アカウント Kerberos principal。例) example@KR.DF.NAVERNCP.COM |
| Keytab(必須) | キータブファイルのパス 例)./df.example.keytab |
| jars | /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar |
spark-submitを通じて Batch Hiveにタスクを提出する例は、以下のとおりです。spark-shell、PySpark にも同様な設定を追加できます。
$ spark-submit \
--master yarn \
--deploy-mode cluster \
--queue longlived \
...
--conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" \
--conf spark.datasource.hive.warehouse.metastoreUri="thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083" \
--conf spark.hadoop.hive.zookeeper.quorum="zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181"
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar \
--class com.naverncp.ExampleApplication \
example_aplication.jar
任意の Sparkバージョンで Sparkの実行環境を構成
Data Forestで提供する Sparkバージョンは2.3.2です。別バージョンの Sparkを使用したい場合に、Hadoop Freeビルドタイプの Sparkをインストールします。
- Devアプリが構成された状態で行います。
- 任意の Sparkバージョンを使用する場合、Spark Dynamic Allocation機能は使用できません。
1. Spark(Hadoop Free)ダウンロード
本ガイドでは、2.4.7バージョンをインストールすることを基準に説明します。
Sparkをダウンロードする方法は、以下のとおりです。
- Apache Sparkのホームページで適合した Sparkバージョンをダウンロードします。
- パッケージタイプは、Pre-bulit with user-provided Apache Hadoop を選択します。
$ mkdir -p $HOME/apps/spark $ wget -P $HOME/apps https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-without-hadoop.tgz $ tar xvfz $HOME/apps/spark-2.4.7-bin-without-hadoop.tgz -C $HOME/apps/ $ ln -s $HOME/apps/spark-2.4.7-bin-without-hadoop $HOME/apps/spark $ SPARK_HOME=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop $ SPARK_CONF_DIR=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop/conf
2. Configurationの設定
- まず、以下のデフォルト値の設定をコピーします。
$ cp /etc/spark2/conf/* $SPARK_CONF_DIR/ $SPARK_CONF_DIR/spark-defaults.confファイルに以下のように変数を追加します。spark.driver.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.am.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro$SPARK_CONF_DIR/spark-env.shファイルに以下のように追加します。export SPARK_SUBMIT_OPTS="-Dhdp.version=3.1.0.0-78" export PATH=$SPARK_HOME/bin:$PATH export SPARK_DIST_CLASSPATH=`$HADOOP_COMMON_HOME/bin/hadoop classpath`
3. jarsのアップロード
Data Forestで提供するhadoop-*.jarには、多様な種類のバグパッチが適用されています。その他にユーザーに必要な jarも、このパスの下位にコピーしておくと jarアップロードするための追加オプション(extraClassPath)を設定する必要がなくなります。
jarsをアップロードする方法は、以下のとおりです。
$SPARK_HOME/jarの下位にこれらの jarsファイルをコピーします。cp /usr/hdp/current/spark2-client/jars/hadoop-*.jar $SPARK_HOME/jars$SPARK_HOME/jarを圧縮します。cd $SPARK_HOME/jars tar cvfz /tmp/spark_jars.tar.gz *- HDFSユーザーホームディレクトリの下位にファイルをアップロードします。
kinit example@KR.DF.NAVERNCP.COM -kt example.service.keytab hadoop fs -copyFromLocal /tmp/spark_jars.tar.gz /user/example/ hadoop fs -setrep 10 /user/example/spark_jars.tar.gz
4. spark-submitの実行
spark-submitを実行する際にspark.yarn.archiveオプションを追加して実行します。以下の例では、上記で作成した spark_wordcount.pyとinput.txt ファイルをそのまま使用します。
$SPARK_HOME/bin/spark-submit --master yarn spark_wordcount.py # 기타 옵션 ...
--conf spark.yarn.archive=hdfs://koya/user/example/spark_jars.tar.gz \
spark.yarn.archiveオプションを追加して実行しない場合、HDFSにローカルファイルシステムの$SPARK_HOME/jarsをアップロードする過程が必要です。そのため、実行時間が長くかかる場合があります。
Spark 3.0.0以降を使用する場合、使用する HDFSネームスペースを指定するオプションをもう一つ追加してます。
--conf spark.kerberos.access.hadoopFileSystems=hdfs://koya,hdfs://tata