Spark、Hiveでデータを処理
    • PDF

    Spark、Hiveでデータを処理

    • PDF

    Article Summary

    VPC環境で利用できます。

    Data Forestで提供する Apache Hadoop、Apache Spark、Apache Hiveを利用してデータを処理・保存する方法を説明します。

    Spark、Hiveでデータ処理: Zeppelinアプリのユースケース

    Zeppelinは、ユーザーが簡単に APIコードを作成して提出する環境を提供します。様々なインタープリターがあるが、本ガイドでは最も使用頻度の高い Sparkと Hiveを使用する方法をユースケースで説明します。このユースケースを応用すると、Data Forestプラットフォーム上でストリーミングデータソースに Spark Streamingを使用して HDFSにデータを保存し、定期的に Hiveクエリバッチを実行するジョブを作成できます。

    参考
    • このガイドで使用する MovieLensデータセットは、MovieLens 20M Datasetをご参照ください。
    • Data Forest > App で作成した Zeppelinノートブックでの実行をベースに説明します。

    Step 1. Zeppelinアプリの作成

    Zeppelinアプリを作成する方法は、次のとおりです。

    1. Zeppelinアプリを作成します。
    2. アプリタイプは ZEPPELIN-0.10.1を選択します。
      df-use-ex2_2-2_ja
    3. アプリの詳細情報の StatusStable になっているか確認します。

    Step 2. データ処理

    データを処理するために元のデータセットをアップロードします。実際のサービスでは Kafka、Flumeなどを通じて原本ログデータなどを HDFSに保存するか、RDBMSから Sqoopを使用して原本データを HDFSに保管できます。
    MovieLensデータセットの.csvファイルを HDFSに直接アップロードする方法は、次のとおりです。

    1. HDFS NameNode UIにアクセスして Utilities > Browse the file systemメニューをクリックします。
    2. /user/${USER}ディレクトリに移動します。
      df-use-ex2_1-3_ko
    3. [Upload] アイコンをクリックします。
    4. rating.csv、movie.csvファイルをアップロードします。
      df-use-ex2_1-4_ko
    参考

    ファイルをクリックしてから Head the file をクリックすると、データの構成が確認できます。
    df-use-ex2_1-5

    1. Zeppelinアプリにアクセス

    Zeppelinアプリにアクセスする方法は、次のとおりです。

    1. Zeppelinにアクセスします。

      • アクセス URLは Zeppelinアプリの Quick links > zeppelin を参照
      • Zeppelinアプリの使用に関する詳細は、Zeppelin を使用するを参照
        df-use-ex2_3-6_ja
    2. 右上の [login] をクリックして、アカウント作成時に設定したアカウント名とパスワードを使用してログインします。
      df-use-ex2_1-7_ko

      df-use-ex2_1-8_ko

    2. Spark Jobの実行

    1. [Notebook] > Create new note をクリックして新しいノートブックを作成します。
      df-use-ex2_note_ko
    2. Default Interpreter は Sparkに指定し、Sparkコードには %sparkをつけてインタープリターを実行します。
    3. Sparkで2つの.csvファイルからデータフレームを作成し、2つのデータフレームを結合して Parquetファイルで HDFSに保存します。
    4. movie.csv、rating.csvをそのまま使用しないで新しいカラムを作成して変更するなど、基本的な transformationを行います。
    • movie.csvから dataframeを作成
      df-use-ex2_1-9_ko
    %spark
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    
    case class Movie(movieId: Int,title: String, genres: String)
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
    
    val movie_df_temp = spark.read
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path. 
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.IntegerType
    
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) // Explode the column genre as it contains multiple values
                    .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))  // Extract released date (year) from title column
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
    
    movie_df.createOrReplaceTempView("movie") // Create dataframe temp view 
    spark.sql("select * from movie limit 3").show()"
    
    • rating.csvから dataframeを作成
      df-use-ex2_1-10_ko
    %spark
    
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    import java.sql.Timestamp
    
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv") 
                           
    import org.apache.spark.sql.functions._
    
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie. 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later 
    
    rating_df.createOrReplaceTempView("rating")
    spark.sql("select * from rating limit 3").show()
    
    注意

    Default Interpreterで使用している Sparkがバージョン3.0から Self joinによって発生する曖昧な列リファレンスが含まれた場合にクエリ失敗が発生します。そのため、上記のプロセスを実行する前に必ず以下のように Zeppelin > Interpreterspark を検索して [edit] ボタンを押し、spark.sql.analyzer.failAmbiguousSelfJoin 値を false に設定します。
    df-use-ex2_1-28

    df-use-ex2_1-29

    1. 上で登録した viewに対して spark-sqlを実行します。
      以下のクエリは例です。
      • 作成した viewに spark-sqlを実行
        df-use-ex2_1-11_ko
    2. 2つのデータフレームから新しいデータフレームを作成します。
      • 2つの dataframeを joinして新しい dataframeを作成
        df-use-ex2_1-12_ko
    3. 作成された最終データフレームを Parquet形式で保存します。
      • year, monthをパーティションに指定
        df-use-ex2_1-13_ko
    4. 書き込みジョブが完了したことを確認します。
      • HDFSの当該パスに以下のようにパーティションが作成され、データが作成されたことが確認できます。
      • 完了したジョブは、Spark History Serverで確認できます。
        df-use-ex2_1-14_ko
    5. 進行中のジョブを確認するには、左下の [Show Incomplete Applications] ボタンをクリックします。
      df-use-ex2_1-15_ko

    3. Hiveクエリの実行

    JDBCインタープリターを使用する前にパスワードを設定します。Data Forestで提供する HiveServer2はアカウント名とパスワードで認証しなければアクセスできませんが、基本インタープリターにはパスワードが設定されていないため、ユーザーが直接追加する必要があります。

    注意

    パスワードを設定せずにクエリを実行すると、org.apache.zeppelin.interpreter.InterpreterException: Error in doAsが発生するのでご注意ください。

    JDBCインタープリターのパスワードを設定する方法は、次のとおりです。

    1. Zeppelin画面右上の アカウント名 > Interpreter をクリックします。
      df-hive_12_vpc_ja
    2. JDBC interpreter を検索します。
      df-quick-start_zeppelin03_ko
    3. [edit] ボタンをクリックします。
      df-quick-start_zeppelin04_ko
    4. 以下のように Propertiesに hive.user、hive.password 項目を追加します。
      • Data Forestアカウントの作成時に設定したパスワードです。
        df-quick-start_zeppelin05-1
    5. [Save] ボタンをクリックします。
    • 変更事項が保存されたインタープリターが再起動します。

    4. Hiveテーブルの作成とクエリの実行

    Spark Jobで書き込みした Parquetファイルから Hiveテーブルを作成し、クエリを実行します。この Zeppelinノートブックの全体内容は、こちらでダウンロードして Importできます。

    注意

    Zeppelinで Hiveクエリを実行する前に必ず Zeppelinで Hiveを使用するガイド内容に合わせて設定されているか確認します。

    1. 新しいノートブックを作成するために、 [Notebook] > Create new note をクリックします。
    2. Default Interpreter は JDBCに指定し、Hiveクエリには %jdbc(hive)をつけてインタープリターを実行します。
    • Databaseの確認
      df-use-ex2_1-17_ko

      注意
      • Externalデータベースの作成時に常に LOCATIONキーワード にパスを指定します。Data Forestユーザーは、Hiveデフォルト値のデータベースにアクセスできないため、常に自分の HDFSホームディレクトリ( /user/${USER} )の下のパスを使用します。事前ジョブとしてデータベースの作成パスに必要なディレクトリを予め作成します。(次のユースケースでは、warehouseフォルダが作成されていることを前提としています)。
      • データベースとは異なり、テーブルを作成する際の LOCATIONキーワード はオプションです。
      • データベースの作成時は、必ず ${USER}__db_${ANY_NAME_YOU_WANT}形式で名前を設定してください。そうしないとエラーが発生します。
      • External Databaseの作成

    df-use-ex2_1-18_ko

    :::

    1. 従来あったファイルでテーブルを作成するので、LOCATIONの後ろにパスとして Parquetファイルが保存されたディレクトリを設定します。
      • Parquetファイルからテーブルを作成
        df-use-ex2_1-19_ko
    2. MSCK REPAIR TABLE ${TABLENAME}でパーティションメタデータを追加します。
      • Hiveテーブルを使用する準備が完了します。
      • パーティションの追加およびメタデータのアップデート
        df-use-ex2_1-20_ko
    3. ユーザーが自由にクエリを実行します。
      • 以下のクエリは例です。
      • クエリのユースケース1(各年度に公開された映画レビュー Top3)
        df-use-ex2_1-21_ko
      • クエリのユースケース2(各年度別にユーザーに最も人気のあった映画)
        df-use-ex2_1-22_ko
    4. 実行した Hiveクエリに対するジョブの進行状況は、 Resource Manager UI で確認できます。 Log をクリックしてジョブログを確認します。
      df-use-ex2_1-23_ko

    Spark、Hiveでデータ処理: Devアプリのユースケース

    Devアプリを作成した後、Beeline、spark-submitのようなクライアントを使用して Spark、Hiveでデータ処理: Zeppelinアプリのユースケースのようなジョブを行う方法を説明します。
    Devアプリには Data Forestで提供するすべてのサービスに対する開発環境が設定されているため、スクリプトを実行するための事前準備は別途必要ありません。Data Forestアプリではなく VPC VMでも開発環境を設定すると、本ガイドの手順を同様に行うことができます。

    参考

    ユーザーの必要に応じて実行方法を変えて使用できます。データを用いて複数テストを行う目的の場合には Zeppelinを、運用環境で定期的に実行するジョブの場合には実行ジョブをスクリプトにするのがより適切です。

    Step 1. Devアプリの作成

    Devアプリを作成する方法は、次のとおりです。

    1. Devアプリを作成します。
    2. アプリタイプは DEV-1.0.0を選択します。
      df-use-dev-ex2_2-2_ja
    3. アプリの詳細情報の StatusStable になっているか確認します。

    Step 2. データ処理

    データを処理するために元のデータセットをアップロードします。データセットをアップロードする方法は、Spark、Hiveでデータ処理: Zeppelin > データ処理をご参照ください。

    1. Sparkクライアントの実行

    Devアプリで Sparkスクリプトを正常に実行するには、まず Kerberos認証が必要です。Kerberosの認証方法は、Dev を使用するガイドをご参照ください。

    注意

    認証以外に別途設定は必要ありません。ただし、認証を完了していない場合は org.apache.hadoop.security.AccessControlExceptionが発生しますのでご注意ください。

    認証が正常に完了すると以下のように確認できます。

    $ klist 
    Ticket cache: FILE:/tmp/krb5cc_p26244
    Default principal: example@KR.DF.NAVERNCP.COM
    Valid starting       Expires              Service principal
    07/14/2021 17:38:02  07/15/2021 17:38:02  krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
            renew until 07/21/2021 17:38:02
    

    df-use-ex3_1-3_vpc

    HDFSクライアントのコマンドでデータセットを確認できます。

    $ hadoop fs -ls | grep .csv
    -rw-------   3 example services    1493648 2021-07-12 15:25 movie.csv
    -rw-r--r--   3 example services  690353377 2021-07-09 18:11 rating.csv
    

    spark-shellを実行できます。
    spark-shellでは、SparkContext、SparkSessionが既に作成されています。 Zeppelinノートパソコンで使用したコードをそのまま実行できます。

    $ spark-shell
    

    df-use-ex3_1-4_vpc

    以下のコードは Zeppelinノートブックで実行したコードと同じです。:pasteで pasteモードに入り、簡単にコードが貼り付けられます。このモードを終了するには [Ctrl] + [D] キーを押します。

    import org.apache.spark.sql.types.{StructType,IntegerType}
    import org.apache.spark.sql.catalyst.ScalaReflection
    import org.apache.spark.sql.functions._
    import java.sql.Timestamp
    case class Movie(movieId: Int,title: String, genres: String)
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
    )
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    import spark.implicits._
    val movie_df_temp = spark.read 
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv")
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) 
                    .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) 
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))
    
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv")
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
    val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
    import org.apache.spark.sql.SaveMode
    movie_rating_df.repartition($"year", $"month")
    .write.partitionBy("year", "month")
    .mode(SaveMode.Append)
    .parquet("/user/example/movie_rating")
    

    REPL形式ではなく、ソースコードを jarでビルドして提出(spark-submit)できます。このジョブには1個の argumentのみ必要です。データセットが位置した HDFSパスを指します。その後、結果ファイルもこのパスの下位に作成されます。

    ソースコードは Githubで確認できます。

    $ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
    $ spark-submit --class com.dataforest.example.MovieRatingWriter \
    --master yarn --deploy-mode cluster --queue longlived \
    --num-executors 20 ./movie-rating-assembly-0.1.jar \
    'hdfs://koya/user/example'
    

    ジョブを提出すると、以下のように進捗状況がコンソールに出力されて確認できます。
    df-use-ex3_1-5_vpc

    Spark Jobを提出すると Resource Manager UI でジョブの状態と進行ログが確認できます。
    df-use-ex3_1-6_vpc_ko

    Spark History Server UI でジョブの実行履歴をより詳しく確認できます。
    df-use-ex3_1-7_vpc_ko

    ジョブが完了すると、以下のように指定した HDFSパスの下位に movie_ratingディレクトリが作成され、その下位に結果 Parquetファイルが作成されたことが確認できます。
    df-use-ex3_1-8_vpc

    2. Hiveクライアントの実行

    Data Forestで提供する HiveServer2はアカウントとパスワード認証を使用します。そのため、Kerberos認証を行った場合でも別途認証が必要です。ここでのパスワードは、Data Forestアカウントの作成時に指定したパスワードです。

    Hiveクライアントを実行する方法は、次のとおりです。

    1. Beelineクライアントを実行します。

      • パスワードに特殊文字がある場合、単一引用符(' ')で囲みます。
      $ beeline -u 
      "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
      

      df-use-ex3_1-9_vpc

    2. Externalデータベースを作成します。

      • 見やすいように、outputの形式を!set outputformat verticalに変更しました。
      • LOCATIONキーワードでパスを設定します。
      CREATE DATABASE example__db_movie_rating
      COMMENT 'MovieLens 20M Dataset'
      LOCATION '/user/example/warehouse/example__db_movie_rating'
      WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
      DESCRIBE DATABASE example__db_movie_rating;
      db_name     example__db_movie_rating
      comment     MovieLens 20M Dataset
      location    hdfs://koya/user/example/warehouse/example__db_movie_rating
      owner_name  example
      owner_type  USER
      parameters
      
    3. Spark Jobで作成した Parquetファイルからテーブルを作成します。

      USE example__db_movie_rating;
      DROP TABLE IF EXISTS movie_rating;
      CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
      partitioned by (year int, month int) stored as parquet 
      location '/user/example/movie_rating';
      MSCK REPAIR TABLE movie_rating;
      
    4. テーブルの内容を確認します。

      SHOW TABLES;
      tab_name  movie_rating
      SHOW CREATE TABLE movie_rating;
      createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
      createtab_stmt    `movieid` int,
      createtab_stmt    `title` string,
      createtab_stmt    `genre` string,
      createtab_stmt    `releasedyear` int,
      createtab_stmt    `userid` int,
      createtab_stmt    `rating` double)
      createtab_stmt  PARTITIONED BY (
      createtab_stmt    `year` int,
      createtab_stmt    `month` int)
      createtab_stmt  ROW FORMAT SERDE
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
      createtab_stmt  STORED AS INPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
      createtab_stmt  OUTPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
      createtab_stmt  LOCATION
      createtab_stmt    'hdfs://koya/user/example/movie_rating'
      createtab_stmt  TBLPROPERTIES (
      createtab_stmt    'bucketing_version'='2',
      createtab_stmt    'transient_lastDdlTime'='1626253592')
      SELECT * FROM movie_rating limit 1;
      movie_rating.movieid       1176
      movie_rating.title         Double Life of Veronique, The (Double Vie de Vronique, La) (1991)
      movie_rating.genre         Romance
      movie_rating.releasedyear  1991
      movie_rating.userid        28507
      movie_rating.rating        4.0
      movie_rating.year          1995
      movie_rating.month         1
      
    5. 作成したテーブルを対象にクエリを実行します。

      • 特定年度に公開した映画の中で評点 Top3を照会するクエリです。
        SELECT releasedYear, title, avgRating, row_num 
        FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num 
        FROM movie_rating 
        GROUP BY releasedYear, title
        ) T
        WHERE row_num <= 3
        ORDER BY releasedYear DESC, row_num LIMIT 6;
        releasedyear  2015
        title         Louis C.K.: Live at The Comedy Store (2015)
        avgrating     3.8
        row_num       1
        releasedyear  2015
        title         Kingsman: The Secret Service (2015)
        avgrating     3.6320754716981134
        row_num       2
        releasedyear  2015
        title         The Second Best Exotic Marigold Hotel (2015)
        avgrating     3.5714285714285716
        row_num       3
        releasedyear  2014
        title         Zero Motivation (Efes beyahasei enosh) (2014)
        avgrating     4.5
        row_num       1
        releasedyear  2014
        title         Whiplash (2014)
        avgrating     4.074750830564784
        row_num       2
        releasedyear  2014
        title         Interstellar (2014)
        avgrating     4.023864289821737
        row_num       3
        
      • 年度別にユーザーに最も人気があった映画(評価が高かった映画)を検索するクエリです。
        SELECT year, title, popularity, row_num FROM
        (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num 
        FROM movie_rating GROUP BY year, title) T
        WHERE row_num = 1
        ORDER BY year desc, popularity desc LIMIT 5;
        year        2015
        title       Matrix, The (1999)
        popularity  1207
        row_num     1
        year        2014
        title       Shawshank Redemption, The (1994)
        popularity  2405
        row_num     1
        year        2013
        title       Shawshank Redemption, The (1994)
        popularity  2549
        row_num     1
        year        2012
        title       Inception (2010)
        popularity  2411
        row_num     1
        year        2011
        title       Inception (2010)
        popularity  3235
        row_num     1
        

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.