- Print
- PDF
Using Spark
- Print
- PDF
Available in VPC
This guide explains how to submit a Spark Job in Data Forest and cases that require additional work such as Hive Metastore integration.
This guide is based on Apache Spark 2.3 version. If you want to use a different version of Spark, see Configuring the Build Spark execution environment with an arbitrary Spark version.
Submit Spark Job
In Data Forest, you can use a public Spark History Server or run a private Spark History Server.
To run a Spark Job, create a Dev app and decide how you want the Spark History Server to run.
$ spark-submit \
--master yarn \
--queue longlived \
...
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--class com.naverncp.ExampleApplication \
example_aplication.jar
If you submit a job that lasts for a week or more, the job may get finished abnormally after a week. Use Spark 2.4.0 or higher version with SPARK-23361.
Spark wordcount
This is an example of submitting a job from the Dev shell with spark_wordcount.py that returns the count of each word in a text file.
We connect the history server app created in Data Forest by referring to Spark History Server.
- Complete Kerberos authentication in the Dev shell.
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ kinit test01 -kt df.test01.keytab
- Write the spark_wordcount.py program as below.
import pyspark sc = pyspark.SparkContext() text_file = sc.textFile("input.txt") counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) counts.saveAsTextFile("output")
- Delete any previously created output before submitting the job.
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -rm -r -f output 21/04/30 14:49:56 INFO fs.TrashPolicyDefault: Moved: 'hdfs://koya/user/test01/output' to trash at: hdfs://koya/user/test01/.Trash/Current/user/test01/output
- Create input.txt, and upload it to HDFS.
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ cp $SPARK_HOME/README.md input.txt [test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ hdfs dfs -put -f input.txt
When running spark-submit, submission does not work unless --master
and --deploy-mode
are specified. Usually, the user's application is submitted from a local device, which is physically located far from the worker machine. This creates a network delay between the driver and executor. To minimize this, cluster mode is used generally.
[test01@shell-0.dev.test01.kr.df.naverncp.com ~][df]$ spark-submit --py-files spark_wordcount.py \
--master yarn \
--deploy-mode cluster \
spark_wordcount.py
- When the job submission is completed along with the log, you can see the output as follows.
- You can connect to the Spark History Server to view the details of your task.
Spark SQL
You must follow all the rules in Hive. However, errors do not occur in Spark SQL, even if the rules are not followed.
For example, the user "example" creates a database called "example_db" which violates Hive rules in Spark SQL, this database is not shown when searched with Beeline. Databases that do not comply with the rules are deleted by regular audits, so it is recommended to create databases in Beeline.
Set Hive Access
You can access Hive tables with Spark SQL. Hive Warehouse Connector (hereafter HWC) is required to read and write Hive Managed Tables in Spark, so if the table is Managed, additional settings are required.
Job | HWC Required | LLAP Required |
---|---|---|
Read Hive's external table in Spark | X | X |
Write Hive's external table in Spark | X | X |
Read Hive's managed table in Spark | O | O |
Write Hive's managed table in Spark | O | X |
- Hive LLAP feature is under preparation.
- Reading Hive's Managed Table is impossible from Spark.
The following are the settings required for accessing a Hive table. Select the required options, considering the type of Hive table (external/managed) and the characteristics of the job.
Property | Value |
---|---|
spark.sql.hive.hiveserver2.jdbc.url (Interactive) | Under preparation |
spark.sql.hive.hiveserver2.jdbc.url (Batch) | jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 |
spark.datasource.hive.warehouse.metastoreUri (required) | thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083 |
spark.datasource.hive.warehouse.load.staging.dir (required) | /user/${USER}/tmp |
spark.hadoop.hive.zookeeper.quorum (required) | zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181 |
spark.hadoop.hive.llap.daemon.service.hosts | Under preparation |
principal (required) | Account kerberization principal. <Example> example@KR.DF.NAVERNCP.COM |
Keytab (required) | Keytab file path < Example >./df.example.keytab |
jars | /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar |
The following is an example of submitting a job to Batch Hive through spark-submit
. You can add the same settings to spark-shell and Pyspark.
$ spark-submit \
--master yarn \
--deploy-mode cluster \
--queue longlived \
...
--conf spark.sql.hive.hiveserver2.jdbc.url="jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" \
--conf spark.datasource.hive.warehouse.metastoreUri="thrift://hms1.kr.df.naverncp.com:9083,thrift://hms2.kr.df.naverncp.com:9083" \
--conf spark.hadoop.hive.zookeeper.quorum="zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181"
--principal example@KR.DF.NAVERNCP.COM \
--keytab df.example.keytab \
--jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar \
--class com.naverncp.ExampleApplication \
example_aplication.jar
Build Spark execution environment with a desired Spark version
The Spark version provided by Data Forest is 2.3.2. If you want to use a different version of Spark, install Hadoop Free build type of Spark.
- You need to proceed with the Dev app configured.
- If you are using a different Spark version than provided, then the Spark dynamic allocation feature can't be used.
1. Download Spark (Hadoop Free)
This guide is based on installing version 2.4.7.
The following describes how to download Spark.
- Please download the desired version of Spark from the Apache Spark homepage.
- Select Pre-built with user-provided Apache Hadoop as the package type.
$ mkdir -p $HOME/apps/spark $ wget -P $HOME/apps https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-without-hadoop.tgz $ tar xvfz $HOME/apps/spark-2.4.7-bin-without-hadoop.tgz -C $HOME/apps/ $ ln -s $HOME/apps/spark-2.4.7-bin-without-hadoop $HOME/apps/spark $ SPARK_HOME=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop $ SPARK_CONF_DIR=$HOME/apps/spark/spark-2.4.7-bin-without-hadoop/conf
2. Set Configuration
- Copy the default configuration below first.
$ cp /etc/spark2/conf/* $SPARK_CONF_DIR/
- Add the following variables to the
$SPARK_CONF_DIR/spark-defaults.conf
file as shown below.spark.driver.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.am.extraJavaOptions -Dhdp.version=3.1.0.0-78 spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS /usr/hdp:/usr/hdp:ro
- Add to the
$SPARK_CONF_DIR/spark-env.sh
file as shown below.export SPARK_SUBMIT_OPTS="-Dhdp.version=3.1.0.0-78" export PATH=$SPARK_HOME/bin:$PATH export SPARK_DIST_CLASSPATH=`$HADOOP_COMMON_HOME/bin/hadoop classpath`
3. Upload jars
hadoop-*.jar
provided by Data Forest has various bug patches applied. The other JARs needed by the user can be copied under the same path, so the additional option (extraClassPath
) for uploading JARs is unnecessary.
The following describes how to upload JARs.
- Copy the JAR files under
$SPARK_HOME/jar
.cp /usr/hdp/current/spark2-client/jars/hadoop-*.jar $SPARK_HOME/jars
- Compress
$SPARK_HOME/jar
.cd $SPARK_HOME/jars tar cvfz /tmp/spark_jars.tar.gz *
- Please upload the file under your HDFS user home directory.
kinit example@KR.DF.NAVERNCP.COM -kt example.service.keytab hadoop fs -copyFromLocal /tmp/spark_jars.tar.gz /user/example/ hadoop fs -setrep 10 /user/example/spark_jars.tar.gz
4. Run spark-submit
When executing spark-submit
, add the spark.yarn.archive
option to run. In the example below, the spark_wordcount.py and input.txt files created above are used as they are.
$SPARK_HOME/bin/spark-submit --master yarn spark_wordcount.py # other options ...
--conf spark.yarn.archive=hdfs://koya/user/example/spark_jars.tar.gz \
If the execution is not done with the spark.yarn.archive
option attached, then it goes through the uploading $SPARK_HOME/jars
from the local file system to HDFS. This may take a considerable amount time to run.
If you are using Spark 3.0.0 or higher, please add one more option to specify the HDFS namespace you want to use.
--conf spark.kerberos.access.hadoopFileSystems=hdfs://koya,hdfs://tata