Process data with Spark and Hive

Prev Next

Available in VPC

The following explains how to process and store data using Apache Hadoop, Apache Spark, and Apache Hive provided by Data Forest.

Data processing with Spark, Hive: examples of using Zeppelin apps

Zeppelin provides an environment where users can easily create and submit API codes. While various interpreters exist, this guide takes Spark and Hive as examples, which are the most commonly used. By utilizing this example, you can create jobs that save data to HDFS by using the Spark Streaming for the streaming data source on the Data Forest platform and periodically execute the distribution of Hive queries.

Note
  • For the MovieLens dataset used in this guide, see MovieLens 20M Dataset.
  • It is based on the progress in the Zeppelin notebook created in Data Forest > App.

Step 1. Create the Zeppelin app

The following describes how to create the Zeppelin app.

  1. Create the Zeppelin app.
  2. Select ZEPPELIN-0.10.1 for the app type.
    df-use-ex2_2-2_en
  3. Check if the app detail's Status has become Stable.

Step 2. Process data

The source data set must be uploaded for data processing. In actual services, you can save the original log data, etc. in HDFS through Kafka, Flume, etc., or in HDFS using Sqoop from RDBMS.
The following describes how to directly upload the CVS files of the MovieLens dataset to HDFS.

  1. Access the HDFS NameNode UI and click the Utilities > Browse the file system menu.
  2. Go to the /user/${USER} directory.
    df-use-ex2_1-3_ko
  3. Click the [Upload] icon.
  4. Upload the files: rating.csv and movie.csv.
    df-use-ex2_1-4_ko
Note

Click Head the file after clicking the file, and then you can see how the data looks like.
df-use-ex2_1-5_ko

1. Access Zeppelin app

The following describes how to access the Zeppelin app.

  1. Access the Zeppelin.

    • Refer to Quick links> Zeppelin of the Zeppelin app for the connection URL
    • For more details on how to use Zeppelin app, see How to use Zeppelin
      df-use-ex2_3-6_en
  2. After clicking [login] in the upper right corner, log in with the account name and password you set when creating the account.
    df-use-ex2_1-7_ko

    df-use-ex2_1-8_ko

2. Run Spark Job

  1. Click [Notebook] > Create new note to create a new notebook.
    df-use-ex2_note_ko
  2. Set the Dafault Interpreter to Spark and execute the interpreter by adding %spark to the Spark code.
  3. Create a dataframe from the two CSV files with Spark, join the two dataframes and save to HDFS as a Parquet file.
  4. Execute basic transformation tasks such as creating new columns and changing them, rather than using movie.csv and rating.csv as they are.
  • Create dataframe from movie.csv
    df-use-ex2_1-9_ko
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection

case class Movie(movieId: Int,title: String, genres: String)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class

val movie_df_temp = spark.read
        .option("header", "true")
        .schema(movie_schema)
        .csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path. 

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val movie_df = movie_df_temp
                .select($"movieId", $"title", explode(split($"genres", "\|")).as("genre")) // Explode the column genre as it contains multiple values
                .withColumn("releasedYear", regexp_extract($"title","\((\\d{4})\)", 1))  // Extract released date (year) from title column
                .withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer

movie_df.createOrReplaceTempView("movie") // Create dataframe temp view 
spark.sql("select * from movie limit 3").show()"
  • Create dataframe from rating.csv
    df-use-ex2_1-10_ko
%spark

import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp

case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
                .option("header", "true")
                .schema(rating_schema)
                .csv("hdfs://koya/user/example/rating.csv") 
                       
import org.apache.spark.sql.functions._

val rating_df_filtered = rating_df_temp.groupBy($"movieId")
    .agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie. 
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
        .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later 

rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()
Caution

Query failure occurs when Spark, which is used as the default interpreter, contains ambiguous column references caused by Self join from version 3.0. Therefore, before performing the above process, be sure to search for Spark inZeppelin > Interpreter as shown below and click the [Edit] button to set the spark.sql.analyzer.failAmbiguousSelfJoin value to False.
df-use-ex2_1-28

df-use-ex2_1-29

  1. Run spark-sql for the view registered above.
    The following query is an example.
    • Execute spark-sql from the created view
      df-use-ex2_1-11_ko
  2. Create a new dataframe from the two dataframes.
    • Join two dataframes to create a new dataframe
      df-use-ex2_1-12_ko
  3. Save the final data frame that was created in the Parquet format.
    • Specify the year and month as partition
      df-use-ex2_1-13_ko
  4. Check if the write job has been completed.
    • You can see that the partitions are created and the data is written in the applicable path of HDFS as shown below.
    • The completed jobs can be viewed on the Spark History Server.
      df-use-ex2_1-14_ko
  5. To check the work in progress, click the [Show Incomplete Applications] button at the bottom left.
    df-use-ex2_1-15_ko

3. Execute Hive queries

A password setup is needed before using the JDBC interpreter. The HiveServer2 provided by Data Forest requires authentication with an account name and password for access. Since the default interpreter does not have the password settings, the user needs to add it manually.

Caution

Be aware that org.apache.zeppelin.interpreter.InterpreterException: Error in doAs occurs when the queries are executed without setting up the password.

The following describes how to set up the password for JDBC interpreter.

  1. Click the Account name > Interpreter located at the upper right on the Zeppelin screen.
    df-hive_12_vpc_en
  2. Search JDBC interpreter.
    df-quick-start_zeppelin03_ko
  3. Click the [Edit] button.
    df-quick-start_zeppelin04_ko
  4. Add the hive.user, hive.password items in Properties as follows:
    • This is the set password when creating the Data Forest account.
      df-quick-start_zeppelin05-1
  5. Click the [Save] button.
    • The interpreter will restart with the changes saved.

4. Create Hive table and run queries

We're going to create a Hive table from the Parquet file written from the Spark job, and execute queries. The complete content of the Zeppelin notebook can be downloaded from here and can be imported.

Caution

Before executing a Hive query in Zeppelin, make sure that the settings are set according to the contents of the Hive user guide in Zeppelin.

  1. Click [Notebook] > Create new note to create a new notebook.
  2. Set Default interpreter as JDBC, and run the interpreter after adding %jdbc(hive) to the Hive query.
  • Check database
    df-use-ex2_1-17_ko

    Caution
    • When creating an external database, always specify the path with the LOCATION keyword. Since Data Forest users cannot access Hive's default database, you need to always use a path under your HDFS home directory ( /user/${USER} ). As a preliminary work, you must create the directory required for the database creation path in advance. (In the example below, the warehouse folder must have been created in advance.)
    • Unlike the database, the LOCATION keyword is optional for creating tables.
    • When creating a database, make sure you set the name in the ${USER}__db_${ANY_NAME_YOU_WANT} format. An error occurs otherwise.
    • Create external database

df-use-ex2_1-18_ko

:::

  1. Since a table is to be created with the existing file, set the directory where the Parquet file is saved as the path after LOCATION.
    • Create table from Paraquet file
      df-use-ex2_1-19_ko
  2. Add the partition metadata with MSCK REPAIR TABLE $ {TABLENAME}.
    • The preparations for using Hive tables are complete.
    • Add partition and update metadata
      df-use-ex2_1-20_ko
  3. Execute queries as you wish.
    • The following query is an example.
    • Query example 1: the average film score of the top 3 released films each year
      df-use-ex2_1-21_ko
    • Query example 2: the most popular film to the users for each year
      df-use-ex2_1-22_ko
  4. You can view the job progress for the executed Hive queries from the Resource Manager UI. Click Log to see the job logs.
    df-use-ex2_1-23_ko

Data processing with Spark, Hive: examples of using Dev apps

After creating a Dev app, it explains how to use clients such as Beeline and spark-submit to perform tasks such as Date processing with Spark and Hive: examples of using Zeppelin apps.
The Dev app has the development environments for all services provided by Data Forest configured, so no preparations are needed for executing scripts. In addition to the Data Forest app, you can configure development environments from the VPC VM to proceed with the same process in this guide.

Note

You can change and use the execution methods according to your needs. Zeppelin is more suitable for performing various tests with the data, and creating execution jobs as scripts is more suitable for jobs that will be periodically executed on the operating environment.

Step 1. Create the Dev app

The following describes how to create the Dev app.

  1. Create the Dev app.
  2. select DEV-1.0.0 for the app type.
    df-use-dev-ex2_2-2_en
  3. Check if the app detail's Status has become Stable.

Step 2. Process data

The source data set must be uploaded for data processing. For instructions on how to upload a data set, see Data processing with Spark and Hive: Zeppelin > Data processing..

1. Execute Spark client

First, Kerberos authentication is needed to successfully execute the Spark script on the Dev app. For instructions on Kerberos authentication, see Use Dev user guide.

Caution

No additional setting is required other than the authentication. However, please be cautious as org.apache.hadoop.security.AccessControlException occurs if the authentication is not complete.

Once the authentication is completed successfully, you can see the result as below.

$ klist 
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting       Expires              Service principal
07/14/2021 17:38:02  07/15/2021 17:38:02  krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
        renew until 07/21/2021 17:38:02

df-use-ex3_1-3_vpc

You can see the data set with the HDFS client command.

$ hadoop fs -ls | grep .csv
-rw-------   3 example services    1493648 2021-07-12 15:25 movie.csv
-rw-r--r--   3 example services  690353377 2021-07-09 18:11 rating.csv

You can run spark-shell.
SparkContext and SparkSession are already created in the spark-shell. You can execute the same code used in the Zeppelin notebook.

$ spark-shell

df-use-ex3_1-4_vpc

The code shown below is the same as what was executed in the Zeppelin notebook. You can enter the paste mode with :paste and easily paste the code. Press the [Ctrl]+[D] key to exit the mode.

import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestamp
case class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._
val movie_df_temp = spark.read 
        .option("header", "true")
        .schema(movie_schema)
        .csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
                .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) 
                .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) 
                .withColumn("releasedYear", $"releasedYear".cast(IntegerType))

val rating_df_temp = spark.read
                .option("header", "true")
                .schema(rating_schema)
                .csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
    .agg(count("*").as("count")).filter($"count" > 10) 
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
        .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")

You can also build the source code as JAR and submit (spark-submit) it, rather than in the REPL format. This job requires only one argument. You can specify the HDFS path where the data set is located. The result file afterward is created in the same path.

You can check the source code from Github.

$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'

Submit the job to see the job's progress displayed in the console as shown below.
df-use-ex3_1-5_vpc

Once you submit a Spark job, you can check the job status and progress logs from Resource Manager UI.
df-use-ex3_1-6_vpc_ko

You can see the job's execution history in more detail from Spark History Server UI.
df-use-ex3_1-7_vpc_ko

When the job is completed, a movie_rating directory is created under the specified HDFS path, and you can see the resulting Parquet file is created under the directory.
df-use-ex3_1-8_vpc

2. Execute Hive client

HiveServer2 provided by Data Forest uses account and password authentication. Therefore, a separate authentication is required, even if you've already done the Kerberos authentication. The password here is the one specified when creating the Data Forest account.

The following describes how to run the Hive client.

  1. Run the Beeline client.

    • If the password has special characters, then they must be enclosed with single quotes (' ').
    $ beeline -u 
    "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
    

    df-use-ex3_1-9_vpc

  2. Create an external database.

    • The output format has been changed to !set outputformat vertical so it is easier to see.
    • Set the path using the LOCATION keyword.
    CREATE DATABASE example__db_movie_rating
    COMMENT 'MovieLens 20M Dataset'
    LOCATION '/user/example/warehouse/example__db_movie_rating'
    WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
    DESCRIBE DATABASE example__db_movie_rating;
    db_name     example__db_movie_rating
    comment     MovieLens 20M Dataset
    location    hdfs://koya/user/example/warehouse/example__db_movie_rating
    owner_name  example
    owner_type  USER
    parameters
    
  3. Create a table from the Parquet file created from the Spark job.

    USE example__db_movie_rating;
    DROP TABLE IF EXISTS movie_rating;
    CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
    partitioned by (year int, month int) stored as parquet 
    location '/user/example/movie_rating';
    MSCK REPAIR TABLE movie_rating;
    
  4. Check the table content.

    SHOW TABLES;
    tab_name  movie_rating
    SHOW CREATE TABLE movie_rating;
    createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
    createtab_stmt    `movieid` int,
    createtab_stmt    `title` string,
    createtab_stmt    `genre` string,
    createtab_stmt    `releasedyear` int,
    createtab_stmt    `userid` int,
    createtab_stmt    `rating` double)
    createtab_stmt  PARTITIONED BY (
    createtab_stmt    `year` int,
    createtab_stmt    `month` int)
    createtab_stmt  ROW FORMAT SERDE
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    createtab_stmt  STORED AS INPUTFORMAT
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    createtab_stmt  OUTPUTFORMAT
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    createtab_stmt  LOCATION
    createtab_stmt    'hdfs://koya/user/example/movie_rating'
    createtab_stmt  TBLPROPERTIES (
    createtab_stmt    'bucketing_version'='2',
    createtab_stmt    'transient_lastDdlTime'='1626253592')
    SELECT * FROM movie_rating limit 1;
    movie_rating.movieid       1176
    movie_rating.title         Double Life of Veronique, The (Double Vie de Véronique, La) (1991)
    movie_rating.genre         Romance
    movie_rating.releasedyear  1991
    movie_rating.userid        28507
    movie_rating.rating        4.0
    movie_rating.year          1995
    movie_rating.month         1
    
  5. Execute queries for the created table.

    • This is a query that searches the top 3 films in ratings from the films released on a specific year.
      SELECT releasedYear, title, avgRating, row_num 
      FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num 
      FROM movie_rating 
      GROUP BY releasedYear, title
      ) T
      WHERE row_num <= 3
      ORDER BY releasedYear DESC, row_num LIMIT 6;
      releasedyear  2015
      title         Louis C.K.: Live at The Comedy Store (2015)
      avgrating     3.8
      row_num       1
      releasedyear  2015
      title         Kingsman: The Secret Service (2015)
      avgrating     3.6320754716981134
      row_num       2
      releasedyear  2015
      title         The Second Best Exotic Marigold Hotel (2015)
      avgrating     3.5714285714285716
      row_num       3
      releasedyear  2014
      title         Zero Motivation (Efes beyahasei enosh) (2014)
      avgrating     4.5
      row_num       1
      releasedyear  2014
      title         Whiplash (2014)
      avgrating     4.074750830564784
      row_num       2
      releasedyear  2014
      title         Interstellar (2014)
      avgrating     4.023864289821737
      row_num       3
      
    • This is the query that searches the most popular film among users (films with the most number of ratings) each year.
      SELECT year, title, popularity, row_num FROM
      (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num 
      FROM movie_rating GROUP BY year, title) T
      WHERE row_num = 1
      ORDER BY year desc, popularity desc LIMIT 5;
      year        2015
      title       Matrix, The (1999)
      popularity  1207
      row_num     1
      year        2014
      title       Shawshank Redemption, The (1994)
      popularity  2405
      row_num     1
      year        2013
      title       Shawshank Redemption, The (1994)
      popularity  2549
      row_num     1
      year        2012
      title       Inception (2010)
      popularity  2411
      row_num     1
      year        2011
      title       Inception (2010)
      popularity  3235
      row_num     1