Process data with Spark and Hive
    • PDF

    Process data with Spark and Hive

    • PDF

    Article Summary

    Available in VPC

    The following explains how to process and store data using Apache Hadoop, Apache Spark, and Apache Hive provided by Data Forest.

    Data processing with Spark, Hive: examples of using Zeppelin apps

    Zeppelin provides an environment where users can easily create and submit API codes. While various interpreters exist, this guide takes Spark and Hive as examples, which are the most commonly used. By utilizing this example, you can create jobs that save data to HDFS by using the Spark Streaming for the streaming data source on the Data Forest platform and periodically execute the distribution of Hive queries.

    Note
    • For the MovieLens dataset used in this guide, see MovieLens 20M Dataset.
    • It is based on the progress in the Zeppelin notebook created in Data Forest > App.

    Step 1. Create the Zeppelin app

    The following describes how to create the Zeppelin app.

    1. Create the Zeppelin app.
    2. Select ZEPPELIN-0.10.1 for the app type.
      df-use-ex2_2-2_en
    3. Check if the app detail's Status has become Stable.

    Step 2. Process data

    The source data set must be uploaded for data processing. In actual services, you can save the original log data, etc. in HDFS through Kafka, Flume, etc., or in HDFS using Sqoop from RDBMS.
    The following describes how to directly upload the CVS files of the MovieLens dataset to HDFS.

    1. Access the HDFS NameNode UI and click the Utilities > Browse the file system menu.
    2. Go to the /user/${USER} directory.
      df-use-ex2_1-3_ko
    3. Click the [Upload] icon.
    4. Upload the files: rating.csv and movie.csv.
      df-use-ex2_1-4_ko
    Note

    Click Head the file after clicking the file, and then you can see how the data looks like.
    df-use-ex2_1-5_ko

    1. Access Zeppelin app

    The following describes how to access the Zeppelin app.

    1. Access the Zeppelin.

      • Refer to Quick links> Zeppelin of the Zeppelin app for the connection URL
      • For more details on how to use Zeppelin app, see How to use Zeppelin
        df-use-ex2_3-6_en
    2. After clicking [login] in the upper right corner, log in with the account name and password you set when creating the account.
      df-use-ex2_1-7_ko

      df-use-ex2_1-8_ko

    2. Run Spark Job

    1. Click [Notebook] > Create new note to create a new notebook.
      df-use-ex2_note_ko
    2. Set the Dafault Interpreter to Spark and execute the interpreter by adding %spark to the Spark code.
    3. Create a dataframe from the two CSV files with Spark, join the two dataframes and save to HDFS as a Parquet file.
    4. Execute basic transformation tasks such as creating new columns and changing them, rather than using movie.csv and rating.csv as they are.
    • Create dataframe from movie.csv
      df-use-ex2_1-9_ko
    %spark
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    
    case class Movie(movieId: Int,title: String, genres: String)
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
    
    val movie_df_temp = spark.read
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path. 
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.IntegerType
    
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\|")).as("genre")) // Explode the column genre as it contains multiple values
                    .withColumn("releasedYear", regexp_extract($"title","\((\\d{4})\)", 1))  // Extract released date (year) from title column
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
    
    movie_df.createOrReplaceTempView("movie") // Create dataframe temp view 
    spark.sql("select * from movie limit 3").show()"
    
    • Create dataframe from rating.csv
      df-use-ex2_1-10_ko
    %spark
    
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    import java.sql.Timestamp
    
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv") 
                           
    import org.apache.spark.sql.functions._
    
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie. 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later 
    
    rating_df.createOrReplaceTempView("rating")
    spark.sql("select * from rating limit 3").show()
    
    Caution

    Query failure occurs when Spark, which is used as the default interpreter, contains ambiguous column references caused by Self join from version 3.0. Therefore, before performing the above process, be sure to search for Spark inZeppelin > Interpreter as shown below and click the [Edit] button to set the spark.sql.analyzer.failAmbiguousSelfJoin value to False.
    df-use-ex2_1-28

    df-use-ex2_1-29

    1. Run spark-sql for the view registered above.
      The following query is an example.
      • Execute spark-sql from the created view
        df-use-ex2_1-11_ko
    2. Create a new dataframe from the two dataframes.
      • Join two dataframes to create a new dataframe
        df-use-ex2_1-12_ko
    3. Save the final data frame that was created in the Parquet format.
      • Specify the year and month as partition
        df-use-ex2_1-13_ko
    4. Check if the write job has been completed.
      • You can see that the partitions are created and the data is written in the applicable path of HDFS as shown below.
      • The completed jobs can be viewed on the Spark History Server.
        df-use-ex2_1-14_ko
    5. To check the work in progress, click the [Show Incomplete Applications] button at the bottom left.
      df-use-ex2_1-15_ko

    3. Execute Hive queries

    A password setup is needed before using the JDBC interpreter. The HiveServer2 provided by Data Forest requires authentication with an account name and password for access. Since the default interpreter does not have the password settings, the user needs to add it manually.

    Caution

    Be aware that org.apache.zeppelin.interpreter.InterpreterException: Error in doAs occurs when the queries are executed without setting up the password.

    The following describes how to set up the password for JDBC interpreter.

    1. Click the Account name > Interpreter located at the upper right on the Zeppelin screen.
      df-hive_12_vpc_en
    2. Search JDBC interpreter.
      df-quick-start_zeppelin03_ko
    3. Click the [Edit] button.
      df-quick-start_zeppelin04_ko
    4. Add the hive.user, hive.password items in Properties as follows:
      • This is the set password when creating the Data Forest account.
        df-quick-start_zeppelin05-1
    5. Click the [Save] button.
      • The interpreter will restart with the changes saved.

    4. Create Hive table and run queries

    We're going to create a Hive table from the Parquet file written from the Spark job, and execute queries. The complete content of the Zeppelin notebook can be downloaded from here and can be imported.

    Caution

    Before executing a Hive query in Zeppelin, make sure that the settings are set according to the contents of the Hive user guide in Zeppelin.

    1. Click [Notebook] > Create new note to create a new notebook.
    2. Set Default interpreter as JDBC, and run the interpreter after adding %jdbc(hive) to the Hive query.
    • Check database
      df-use-ex2_1-17_ko

      Caution
      • When creating an external database, always specify the path with the LOCATION keyword. Since Data Forest users cannot access Hive's default database, you need to always use a path under your HDFS home directory ( /user/${USER} ). As a preliminary work, you must create the directory required for the database creation path in advance. (In the example below, the warehouse folder must have been created in advance.)
      • Unlike the database, the LOCATION keyword is optional for creating tables.
      • When creating a database, make sure you set the name in the ${USER}__db_${ANY_NAME_YOU_WANT} format. An error occurs otherwise.
      • Create external database

    df-use-ex2_1-18_ko

    :::

    1. Since a table is to be created with the existing file, set the directory where the Parquet file is saved as the path after LOCATION.
      • Create table from Paraquet file
        df-use-ex2_1-19_ko
    2. Add the partition metadata with MSCK REPAIR TABLE $ {TABLENAME}.
      • The preparations for using Hive tables are complete.
      • Add partition and update metadata
        df-use-ex2_1-20_ko
    3. Execute queries as you wish.
      • The following query is an example.
      • Query example 1: the average film score of the top 3 released films each year
        df-use-ex2_1-21_ko
      • Query example 2: the most popular film to the users for each year
        df-use-ex2_1-22_ko
    4. You can view the job progress for the executed Hive queries from the Resource Manager UI. Click Log to see the job logs.
      df-use-ex2_1-23_ko

    Data processing with Spark, Hive: examples of using Dev apps

    After creating a Dev app, it explains how to use clients such as Beeline and spark-submit to perform tasks such as Date processing with Spark and Hive: examples of using Zeppelin apps.
    The Dev app has the development environments for all services provided by Data Forest configured, so no preparations are needed for executing scripts. In addition to the Data Forest app, you can configure development environments from the VPC VM to proceed with the same process in this guide.

    Note

    You can change and use the execution methods according to your needs. Zeppelin is more suitable for performing various tests with the data, and creating execution jobs as scripts is more suitable for jobs that will be periodically executed on the operating environment.

    Step 1. Create the Dev app

    The following describes how to create the Dev app.

    1. Create the Dev app.
    2. select DEV-1.0.0 for the app type.
      df-use-dev-ex2_2-2_en
    3. Check if the app detail's Status has become Stable.

    Step 2. Process data

    The source data set must be uploaded for data processing. For instructions on how to upload a data set, see Data processing with Spark and Hive: Zeppelin > Data processing..

    1. Execute Spark client

    First, Kerberos authentication is needed to successfully execute the Spark script on the Dev app. For instructions on Kerberos authentication, see Use Dev user guide.

    Caution

    No additional setting is required other than the authentication. However, please be cautious as org.apache.hadoop.security.AccessControlException occurs if the authentication is not complete.

    Once the authentication is completed successfully, you can see the result as below.

    $ klist 
    Ticket cache: FILE:/tmp/krb5cc_p26244
    Default principal: example@KR.DF.NAVERNCP.COM
    Valid starting       Expires              Service principal
    07/14/2021 17:38:02  07/15/2021 17:38:02  krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
            renew until 07/21/2021 17:38:02
    

    df-use-ex3_1-3_vpc

    You can see the data set with the HDFS client command.

    $ hadoop fs -ls | grep .csv
    -rw-------   3 example services    1493648 2021-07-12 15:25 movie.csv
    -rw-r--r--   3 example services  690353377 2021-07-09 18:11 rating.csv
    

    You can run spark-shell.
    SparkContext and SparkSession are already created in the spark-shell. You can execute the same code used in the Zeppelin notebook.

    $ spark-shell
    

    df-use-ex3_1-4_vpc

    The code shown below is the same as what was executed in the Zeppelin notebook. You can enter the paste mode with :paste and easily paste the code. Press the [Ctrl]+[D] key to exit the mode.

    import org.apache.spark.sql.types.{StructType,IntegerType}
    import org.apache.spark.sql.catalyst.ScalaReflection
    import org.apache.spark.sql.functions._
    import java.sql.Timestamp
    case class Movie(movieId: Int,title: String, genres: String)
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
    )
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    import spark.implicits._
    val movie_df_temp = spark.read 
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv")
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) 
                    .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) 
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))
    
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv")
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
    val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
    import org.apache.spark.sql.SaveMode
    movie_rating_df.repartition($"year", $"month")
    .write.partitionBy("year", "month")
    .mode(SaveMode.Append)
    .parquet("/user/example/movie_rating")
    

    You can also build the source code as JAR and submit (spark-submit) it, rather than in the REPL format. This job requires only one argument. You can specify the HDFS path where the data set is located. The result file afterward is created in the same path.

    You can check the source code from Github.

    $ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
    $ spark-submit --class com.dataforest.example.MovieRatingWriter \
    --master yarn --deploy-mode cluster --queue longlived \
    --num-executors 20 ./movie-rating-assembly-0.1.jar \
    'hdfs://koya/user/example'
    

    Submit the job to see the job's progress displayed in the console as shown below.
    df-use-ex3_1-5_vpc

    Once you submit a Spark job, you can check the job status and progress logs from Resource Manager UI.
    df-use-ex3_1-6_vpc_ko

    You can see the job's execution history in more detail from Spark History Server UI.
    df-use-ex3_1-7_vpc_ko

    When the job is completed, a movie_rating directory is created under the specified HDFS path, and you can see the resulting Parquet file is created under the directory.
    df-use-ex3_1-8_vpc

    2. Execute Hive client

    HiveServer2 provided by Data Forest uses account and password authentication. Therefore, a separate authentication is required, even if you've already done the Kerberos authentication. The password here is the one specified when creating the Data Forest account.

    The following describes how to run the Hive client.

    1. Run the Beeline client.

      • If the password has special characters, then they must be enclosed with single quotes (' ').
      $ beeline -u 
      "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
      

      df-use-ex3_1-9_vpc

    2. Create an external database.

      • The output format has been changed to !set outputformat vertical so it is easier to see.
      • Set the path using the LOCATION keyword.
      CREATE DATABASE example__db_movie_rating
      COMMENT 'MovieLens 20M Dataset'
      LOCATION '/user/example/warehouse/example__db_movie_rating'
      WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
      DESCRIBE DATABASE example__db_movie_rating;
      db_name     example__db_movie_rating
      comment     MovieLens 20M Dataset
      location    hdfs://koya/user/example/warehouse/example__db_movie_rating
      owner_name  example
      owner_type  USER
      parameters
      
    3. Create a table from the Parquet file created from the Spark job.

      USE example__db_movie_rating;
      DROP TABLE IF EXISTS movie_rating;
      CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
      partitioned by (year int, month int) stored as parquet 
      location '/user/example/movie_rating';
      MSCK REPAIR TABLE movie_rating;
      
    4. Check the table content.

      SHOW TABLES;
      tab_name  movie_rating
      SHOW CREATE TABLE movie_rating;
      createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
      createtab_stmt    `movieid` int,
      createtab_stmt    `title` string,
      createtab_stmt    `genre` string,
      createtab_stmt    `releasedyear` int,
      createtab_stmt    `userid` int,
      createtab_stmt    `rating` double)
      createtab_stmt  PARTITIONED BY (
      createtab_stmt    `year` int,
      createtab_stmt    `month` int)
      createtab_stmt  ROW FORMAT SERDE
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
      createtab_stmt  STORED AS INPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
      createtab_stmt  OUTPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
      createtab_stmt  LOCATION
      createtab_stmt    'hdfs://koya/user/example/movie_rating'
      createtab_stmt  TBLPROPERTIES (
      createtab_stmt    'bucketing_version'='2',
      createtab_stmt    'transient_lastDdlTime'='1626253592')
      SELECT * FROM movie_rating limit 1;
      movie_rating.movieid       1176
      movie_rating.title         Double Life of Veronique, The (Double Vie de Véronique, La) (1991)
      movie_rating.genre         Romance
      movie_rating.releasedyear  1991
      movie_rating.userid        28507
      movie_rating.rating        4.0
      movie_rating.year          1995
      movie_rating.month         1
      
    5. Execute queries for the created table.

      • This is a query that searches the top 3 films in ratings from the films released on a specific year.
        SELECT releasedYear, title, avgRating, row_num 
        FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num 
        FROM movie_rating 
        GROUP BY releasedYear, title
        ) T
        WHERE row_num <= 3
        ORDER BY releasedYear DESC, row_num LIMIT 6;
        releasedyear  2015
        title         Louis C.K.: Live at The Comedy Store (2015)
        avgrating     3.8
        row_num       1
        releasedyear  2015
        title         Kingsman: The Secret Service (2015)
        avgrating     3.6320754716981134
        row_num       2
        releasedyear  2015
        title         The Second Best Exotic Marigold Hotel (2015)
        avgrating     3.5714285714285716
        row_num       3
        releasedyear  2014
        title         Zero Motivation (Efes beyahasei enosh) (2014)
        avgrating     4.5
        row_num       1
        releasedyear  2014
        title         Whiplash (2014)
        avgrating     4.074750830564784
        row_num       2
        releasedyear  2014
        title         Interstellar (2014)
        avgrating     4.023864289821737
        row_num       3
        
      • This is the query that searches the most popular film among users (films with the most number of ratings) each year.
        SELECT year, title, popularity, row_num FROM
        (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num 
        FROM movie_rating GROUP BY year, title) T
        WHERE row_num = 1
        ORDER BY year desc, popularity desc LIMIT 5;
        year        2015
        title       Matrix, The (1999)
        popularity  1207
        row_num     1
        year        2014
        title       Shawshank Redemption, The (1994)
        popularity  2405
        row_num     1
        year        2013
        title       Shawshank Redemption, The (1994)
        popularity  2549
        row_num     1
        year        2012
        title       Inception (2010)
        popularity  2411
        row_num     1
        year        2011
        title       Inception (2010)
        popularity  3235
        row_num     1
        

    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.