- Print
- PDF
Process data with Spark and Hive
- Print
- PDF
Available in VPC
The following explains how to process and store data using Apache Hadoop, Apache Spark, and Apache Hive provided by Data Forest.
Data processing with Spark, Hive: examples of using Zeppelin apps
Zeppelin provides an environment where users can easily create and submit API codes. While various interpreters exist, this guide takes Spark and Hive as examples, which are the most commonly used. By utilizing this example, you can create jobs that save data to HDFS by using the Spark Streaming for the streaming data source on the Data Forest platform and periodically execute the distribution of Hive queries.
- For the MovieLens dataset used in this guide, see MovieLens 20M Dataset.
- It is based on the progress in the Zeppelin notebook created in Data Forest > App.
Step 1. Create the Zeppelin app
The following describes how to create the Zeppelin app.
- Create the Zeppelin app.
- See Create and manage app for how to create an app
- See Create and manage app for how to create an app
- Select ZEPPELIN-0.10.1 for the app type.
- Check if the app detail's Status has become Stable.
Step 2. Process data
The source data set must be uploaded for data processing. In actual services, you can save the original log data, etc. in HDFS through Kafka, Flume, etc., or in HDFS using Sqoop from RDBMS.
The following describes how to directly upload the CVS files of the MovieLens dataset to HDFS.
- Access the HDFS NameNode UI and click the Utilities > Browse the file system menu.
- Go to the
/user/${USER}
directory.
- Click the [Upload] icon.
- Upload the files: rating.csv and movie.csv.
Click Head the file after clicking the file, and then you can see how the data looks like.
1. Access Zeppelin app
The following describes how to access the Zeppelin app.
Access the Zeppelin.
- Refer to Quick links> Zeppelin of the Zeppelin app for the connection URL
- For more details on how to use Zeppelin app, see How to use Zeppelin
After clicking [login] in the upper right corner, log in with the account name and password you set when creating the account.
2. Run Spark Job
- Click [Notebook] > Create new note to create a new notebook.
- Set the Dafault Interpreter to Spark and execute the interpreter by adding
%spark
to the Spark code. - Create a dataframe from the two CSV files with Spark, join the two dataframes and save to HDFS as a Parquet file.
- Execute basic transformation tasks such as creating new columns and changing them, rather than using movie.csv and rating.csv as they are.
- Create dataframe from movie.csv
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
case class Movie(movieId: Int,title: String, genres: String)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\|")).as("genre")) // Explode the column genre as it contains multiple values
.withColumn("releasedYear", regexp_extract($"title","\((\\d{4})\)", 1)) // Extract released date (year) from title column
.withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
movie_df.createOrReplaceTempView("movie") // Create dataframe temp view
spark.sql("select * from movie limit 3").show()"
- Create dataframe from rating.csv
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
import org.apache.spark.sql.functions._
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie.
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later
rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()
Query failure occurs when Spark, which is used as the default interpreter, contains ambiguous column references caused by Self join from version 3.0. Therefore, before performing the above process, be sure to search for Spark inZeppelin > Interpreter as shown below and click the [Edit] button to set the spark.sql.analyzer.failAmbiguousSelfJoin value to False.
- Run spark-sql for the view registered above.
The following query is an example.- Execute spark-sql from the created view
- Execute spark-sql from the created view
- Create a new dataframe from the two dataframes.
- Join two dataframes to create a new dataframe
- Join two dataframes to create a new dataframe
- Save the final data frame that was created in the Parquet format.
- Specify the year and month as partition
- Specify the year and month as partition
- Check if the write job has been completed.
- You can see that the partitions are created and the data is written in the applicable path of HDFS as shown below.
- The completed jobs can be viewed on the Spark History Server.
- To check the work in progress, click the [Show Incomplete Applications] button at the bottom left.
3. Execute Hive queries
A password setup is needed before using the JDBC interpreter. The HiveServer2 provided by Data Forest requires authentication with an account name and password for access. Since the default interpreter does not have the password settings, the user needs to add it manually.
Be aware that org.apache.zeppelin.interpreter.InterpreterException: Error in doAs
occurs when the queries are executed without setting up the password.
The following describes how to set up the password for JDBC interpreter.
- Click the Account name > Interpreter located at the upper right on the Zeppelin screen.
- Search JDBC interpreter.
- Click the [Edit] button.
- Add the hive.user, hive.password items in Properties as follows:
- This is the set password when creating the Data Forest account.
- This is the set password when creating the Data Forest account.
- Click the [Save] button.
- The interpreter will restart with the changes saved.
4. Create Hive table and run queries
We're going to create a Hive table from the Parquet file written from the Spark job, and execute queries. The complete content of the Zeppelin notebook can be downloaded from here and can be imported.
Before executing a Hive query in Zeppelin, make sure that the settings are set according to the contents of the Hive user guide in Zeppelin.
- Click [Notebook] > Create new note to create a new notebook.
- Set Default interpreter as JDBC, and run the interpreter after adding
%jdbc(hive)
to the Hive query.
Check database
Caution- When creating an external database, always specify the path with the LOCATION keyword. Since Data Forest users cannot access Hive's default database, you need to always use a path under your HDFS home directory ( /user/${USER} ). As a preliminary work, you must create the directory required for the database creation path in advance. (In the example below, the warehouse folder must have been created in advance.)
- Unlike the database, the LOCATION keyword is optional for creating tables.
- When creating a database, make sure you set the name in the
${USER}__db_${ANY_NAME_YOU_WANT}
format. An error occurs otherwise. - Create external database
:::
- Since a table is to be created with the existing file, set the directory where the Parquet file is saved as the path after LOCATION.
- Create table from Paraquet file
- Create table from Paraquet file
- Add the partition metadata with MSCK REPAIR TABLE $ {TABLENAME}.
- The preparations for using Hive tables are complete.
- Add partition and update metadata
- Execute queries as you wish.
- The following query is an example.
- Query example 1: the average film score of the top 3 released films each year
- Query example 2: the most popular film to the users for each year
- You can view the job progress for the executed Hive queries from the Resource Manager UI. Click Log to see the job logs.
Data processing with Spark, Hive: examples of using Dev apps
After creating a Dev app, it explains how to use clients such as Beeline and spark-submit to perform tasks such as Date processing with Spark and Hive: examples of using Zeppelin apps.
The Dev app has the development environments for all services provided by Data Forest configured, so no preparations are needed for executing scripts. In addition to the Data Forest app, you can configure development environments from the VPC VM to proceed with the same process in this guide.
You can change and use the execution methods according to your needs. Zeppelin is more suitable for performing various tests with the data, and creating execution jobs as scripts is more suitable for jobs that will be periodically executed on the operating environment.
Step 1. Create the Dev app
The following describes how to create the Dev app.
- Create the Dev app.
- See Create and manage app for how to create an app
- select DEV-1.0.0 for the app type.
- Check if the app detail's Status has become Stable.
Step 2. Process data
The source data set must be uploaded for data processing. For instructions on how to upload a data set, see Data processing with Spark and Hive: Zeppelin > Data processing..
1. Execute Spark client
First, Kerberos authentication is needed to successfully execute the Spark script on the Dev app. For instructions on Kerberos authentication, see Use Dev user guide.
No additional setting is required other than the authentication. However, please be cautious as org.apache.hadoop.security.AccessControlException
occurs if the authentication is not complete.
Once the authentication is completed successfully, you can see the result as below.
$ klist
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting Expires Service principal
07/14/2021 17:38:02 07/15/2021 17:38:02 krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
renew until 07/21/2021 17:38:02
You can see the data set with the HDFS client command.
$ hadoop fs -ls | grep .csv
-rw------- 3 example services 1493648 2021-07-12 15:25 movie.csv
-rw-r--r-- 3 example services 690353377 2021-07-09 18:11 rating.csv
You can run spark-shell.
SparkContext and SparkSession are already created in the spark-shell. You can execute the same code used in the Zeppelin notebook.
$ spark-shell
The code shown below is the same as what was executed in the Zeppelin notebook. You can enter the paste mode with :paste
and easily paste the code. Press the [Ctrl]+[D] key to exit the mode.
import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestamp
case class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))
.withColumn("releasedYear", $"releasedYear".cast(IntegerType))
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10)
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")
You can also build the source code as JAR and submit (spark-submit) it, rather than in the REPL format. This job requires only one argument. You can specify the HDFS path where the data set is located. The result file afterward is created in the same path.
You can check the source code from Github.
$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'
Submit the job to see the job's progress displayed in the console as shown below.
Once you submit a Spark job, you can check the job status and progress logs from Resource Manager UI.
You can see the job's execution history in more detail from Spark History Server UI.
When the job is completed, a movie_rating directory is created under the specified HDFS path, and you can see the resulting Parquet file is created under the directory.
2. Execute Hive client
HiveServer2 provided by Data Forest uses account and password authentication. Therefore, a separate authentication is required, even if you've already done the Kerberos authentication. The password here is the one specified when creating the Data Forest account.
The following describes how to run the Hive client.
Run the Beeline client.
- If the password has special characters, then they must be enclosed with single quotes (' ').
$ beeline -u "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
Create an external database.
- The output format has been changed to !set outputformat vertical so it is easier to see.
- Set the path using the LOCATION keyword.
CREATE DATABASE example__db_movie_rating COMMENT 'MovieLens 20M Dataset' LOCATION '/user/example/warehouse/example__db_movie_rating' WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01'); DESCRIBE DATABASE example__db_movie_rating; db_name example__db_movie_rating comment MovieLens 20M Dataset location hdfs://koya/user/example/warehouse/example__db_movie_rating owner_name example owner_type USER parameters
Create a table from the Parquet file created from the Spark job.
USE example__db_movie_rating; DROP TABLE IF EXISTS movie_rating; CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double) partitioned by (year int, month int) stored as parquet location '/user/example/movie_rating'; MSCK REPAIR TABLE movie_rating;
Check the table content.
SHOW TABLES; tab_name movie_rating SHOW CREATE TABLE movie_rating; createtab_stmt CREATE EXTERNAL TABLE `movie_rating`( createtab_stmt `movieid` int, createtab_stmt `title` string, createtab_stmt `genre` string, createtab_stmt `releasedyear` int, createtab_stmt `userid` int, createtab_stmt `rating` double) createtab_stmt PARTITIONED BY ( createtab_stmt `year` int, createtab_stmt `month` int) createtab_stmt ROW FORMAT SERDE createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' createtab_stmt STORED AS INPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' createtab_stmt OUTPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' createtab_stmt LOCATION createtab_stmt 'hdfs://koya/user/example/movie_rating' createtab_stmt TBLPROPERTIES ( createtab_stmt 'bucketing_version'='2', createtab_stmt 'transient_lastDdlTime'='1626253592') SELECT * FROM movie_rating limit 1; movie_rating.movieid 1176 movie_rating.title Double Life of Veronique, The (Double Vie de Véronique, La) (1991) movie_rating.genre Romance movie_rating.releasedyear 1991 movie_rating.userid 28507 movie_rating.rating 4.0 movie_rating.year 1995 movie_rating.month 1
Execute queries for the created table.
- This is a query that searches the top 3 films in ratings from the films released on a specific year.
SELECT releasedYear, title, avgRating, row_num FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num FROM movie_rating GROUP BY releasedYear, title ) T WHERE row_num <= 3 ORDER BY releasedYear DESC, row_num LIMIT 6; releasedyear 2015 title Louis C.K.: Live at The Comedy Store (2015) avgrating 3.8 row_num 1 releasedyear 2015 title Kingsman: The Secret Service (2015) avgrating 3.6320754716981134 row_num 2 releasedyear 2015 title The Second Best Exotic Marigold Hotel (2015) avgrating 3.5714285714285716 row_num 3 releasedyear 2014 title Zero Motivation (Efes beyahasei enosh) (2014) avgrating 4.5 row_num 1 releasedyear 2014 title Whiplash (2014) avgrating 4.074750830564784 row_num 2 releasedyear 2014 title Interstellar (2014) avgrating 4.023864289821737 row_num 3
- This is the query that searches the most popular film among users (films with the most number of ratings) each year.
SELECT year, title, popularity, row_num FROM (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num FROM movie_rating GROUP BY year, title) T WHERE row_num = 1 ORDER BY year desc, popularity desc LIMIT 5; year 2015 title Matrix, The (1999) popularity 1207 row_num 1 year 2014 title Shawshank Redemption, The (1994) popularity 2405 row_num 1 year 2013 title Shawshank Redemption, The (1994) popularity 2549 row_num 1 year 2012 title Inception (2010) popularity 2411 row_num 1 year 2011 title Inception (2010) popularity 3235 row_num 1
- This is a query that searches the top 3 films in ratings from the films released on a specific year.