- 印刷する
- PDF
Spark を使用する
- 印刷する
- PDF
VPC環境で利用できます。
Data Forestで Spark Jobを提出する方法と、Hive Metastore連携などの追加操作が必要なケースについて説明します。
本ガイドでは Apache Spark 2.3バージョンをベースに説明します。他のバージョンの Sparkを使用する場合、任意のバージョンでの実行環境を構成をご参考ください。
Spark Jobの提出
Data Forestでユーザーは共用 Spark History Serverを使用するか個人用 Spark History Serverを実行できます。
Spark Jobを実行するには、Devアプリを作成してから Spark History Server実行方式を選択します。
$ spark-submit \
--master yarn \
--queue longlived \
...
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--class com.naverncp.ExampleApplication \
example_aplication.jar
一週間以上実行されるジョブを提出する場合、一週間後にタスクが異常終了する可能性があるので、SPARK-23361が適用された Spark 2.4.0以降を使用します。
Spark wordcount
テキストファイルで各単語の数を返す spark_wordcount.pyで、Devシェル(shell)でタスクを提出する例です。
Spark History Serverを参考として Data Forestで作成したヒストリサーバアプリと接続しました。
Devシェルで Kerberos認証を完了します。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kinit test01 -kt df.test01.keytab
spark_wordcount.py プログラムを以下のように作成します。
import pyspark sc = pyspark.SparkContext() text_file = sc.textFile("input.txt") counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("output")
タスクを提出する前に、過去に作成された outputがある場合には削除します。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -rm -r -f output 21/04/30 14:49:56 INFO fs.TrashPolicyDefault: Moved: 'hdfs://koya/user/test01/output' to trash at: hdfs://koya/user/test01/.Trash/Current/user/test01/output
Input.txtを作成してから HDFSにアップロードします。
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cp $SPARK_HOME/README.md input.txt [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -put -f input.txt
参考spark-submitを実行する場合、
--master
、--deploy-mode
を指定しないと提出されません。主にユーザーのアプリケーションは worker machineから物理的に遠いローカルデバイスから提出されるため、driverと executor間のネットワーク遅延が生じます。これを最小限にするために、一般的に cluster mode を使用します。[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ spark-submit --py-files spark_wordcount.py \ --master yarn \ --deploy-mode cluster \ spark_wordcount.py
ログと一緒にタスクの提出が完了すると、以下のような outputの内容が確認できます。
Spark History Serverにアクセスして作業した内容を確認できます。
Spark SQL
Hiveにあるルールをすべて遵守する必要があります。ただし、Spark SQLではこのルールに合致しなくてもエラーは発生しません。
例えば、ユーザー「example」が spark-sqlで「example_db」という名前で Hiveのルールに反するデータベースを作成する場合、Beelineで照会するとこのデータベースは照会されません。しかし、定期的な監査でルールに反するデータベースは削除しているため、データベースの作成は Beelineで行うことをお勧めします。
Hiveアクセスの設定
Spark SQLで Hiveテーブルにアクセスできます。Sparkで Hive Managed Tableテーブルの読み込み・書き込みを行うには Hive Warehouse Connector(以下、HWC)が必要であるため、テーブルが Managedである場合は追加設定が必要です。
タスク | HWC Required | LLAP Required |
---|---|---|
Sparkで Hiveの External Tableを読み込む | X | X |
Sparkでの Hiveの External Tableを書き込む | X | X |
Sparkで Hiveの Managed Tableを読み込む | O | O |
Sparkで Hiveの Managed Tableを書き込む | O | X |
- Hive LLAP機能は準備中です。
- Sparkで Hiveの Managed Tableの読み込みは不可能です。
Hiveテーブルにアクセスする際に必要な設定は、以下のとおりです。Hiveテーブルの性質(External/Managed)とタスクの特性を考えて必要なオプションを選択します。
プロパティ | 値 |
---|---|
spark.sql.hive.hiveserver2.jdbc.url (Interactive) | 準備中 |
spark.sql.hive.hiveserver2.jdbc.url (Batch) | jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 |
spark.datasource.hive.wareouse.metastoreUri(必須) | thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083 |
spark.datasource.hive.warehouse.load.staging.dir(必須) | /user/${USER}/tmp |
spark.hadoop.hive.zookeeper.quorum(必須) | zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181 |
spark.hadoop.hive.llap.daemon.service.hosts | 準備中 |
Principal(必須) | アカウント Kerberos principal。例) example@KR.DF.NAVERNCP.COM |
Keytab(必須) | キータブファイルのパス 例)./df.example.keytab |
jars | /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar |
spark-submit
を通じて Batch Hiveにタスクを提出する例は、以下のとおりです。spark-shell、PySpark にも同様な設定を追加できます。
$ spark-submit \
--master yarn \
--deploy-mode cluster \
--queue longlived \
...
--conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" \
--conf spark.datasource.hive.warehouse.metastoreUri="thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083" \
--conf spark.hadoop.hive.zookeeper.quorum="zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181"
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar \
--class com.naverncp.ExampleApplication \
example_aplication.jar
任意の Sparkバージョンで Sparkの実行環境を構成
Data Forestで提供する Sparkバージョンは2.3.2です。別バージョンの Sparkを使用したい場合に、Hadoop Freeビルドタイプの Sparkをインストールします。
- Devアプリが構成された状態で行います。
- 任意の Sparkバージョンを使用する場合、Spark Dynamic Allocation機能は使用できません。
1. Spark(Hadoop Free)ダウンロード
本ガイドでは、2.4.7バージョンをインストールすることを基準に説明します。
Sparkをダウンロードする方法は、以下のとおりです。
- Apache Sparkのホームページで適合した Sparkバージョンをダウンロードします。
- パッケージタイプは、Pre-bulit with user-provided Apache Hadoop を選択します。
$ mkdir -p $HOME/apps/spark $ wget -P $HOME/apps https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-without-hadoop.tgz $ tar xvfz $HOME/apps/spark-2.4.7-bin-without-hadoop.tgz -C $HOME/apps/ $ ln -s $HOME/apps/spark-2.4.7-bin-without-hadoop $HOME/apps/spark $ SPARK_HOME=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop $ SPARK_CONF_DIR=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop/conf
2. Configurationの設定
- まず、以下のデフォルト値の設定をコピーします。
$ cp /etc/spark2/conf/* $SPARK_CONF_DIR/
$SPARK_CONF_DIR/spark-defaults.conf
ファイルに以下のように変数を追加します。spark.driver.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.am.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
$SPARK_CONF_DIR/spark-env.sh
ファイルに以下のように追加します。export SPARK_SUBMIT_OPTS="-Dhdp.version=3.1.0.0-78" export PATH=$SPARK_HOME/bin:$PATH export SPARK_DIST_CLASSPATH=`$HADOOP_COMMON_HOME/bin/hadoop classpath`
3. jarsのアップロード
Data Forestで提供するhadoop-*.jar
には、多様な種類のバグパッチが適用されています。その他にユーザーに必要な jarも、このパスの下位にコピーしておくと jarアップロードするための追加オプション(extraClassPath
)を設定する必要がなくなります。
jarsをアップロードする方法は、以下のとおりです。
$SPARK_HOME/jar
の下位にこれらの jarsファイルをコピーします。cp /usr/hdp/current/spark2-client/jars/hadoop-*.jar $SPARK_HOME/jars
$SPARK_HOME/jar
を圧縮します。cd $SPARK_HOME/jars tar cvfz /tmp/spark_jars.tar.gz *
- HDFSユーザーホームディレクトリの下位にファイルをアップロードします。
kinit example@KR.DF.NAVERNCP.COM -kt example.service.keytab hadoop fs -copyFromLocal /tmp/spark_jars.tar.gz /user/example/ hadoop fs -setrep 10 /user/example/spark_jars.tar.gz
4. spark-submitの実行
spark-submit
を実行する際にspark.yarn.archive
オプションを追加して実行します。以下の例では、上記で作成した spark_wordcount.pyとinput.txt ファイルをそのまま使用します。
$SPARK_HOME/bin/spark-submit --master yarn spark_wordcount.py # 기타 옵션 ...
--conf spark.yarn.archive=hdfs://koya/user/example/spark_jars.tar.gz \
spark.yarn.archive
オプションを追加して実行しない場合、HDFSにローカルファイルシステムの$SPARK_HOME/jars
をアップロードする過程が必要です。そのため、実行時間が長くかかる場合があります。
Spark 3.0.0以降を使用する場合、使用する HDFSネームスペースを指定するオプションをもう一つ追加してます。
--conf spark.kerberos.access.hadoopFileSystems=hdfs://koya,hdfs://tata