Spark、Hiveでデータを処理

Prev Next

VPC環境で利用できます。

Data Forestで提供する Apache Hadoop、Apache Spark、Apache Hiveを利用してデータを処理・保存する方法を説明します。

Spark、Hiveでデータ処理: Zeppelinアプリのユースケース

Zeppelinは、ユーザーが簡単に APIコードを作成して提出する環境を提供します。様々なインタープリターがあるが、本ガイドでは最も使用頻度の高い Sparkと Hiveを使用する方法をユースケースで説明します。このユースケースを応用すると、Data Forestプラットフォーム上でストリーミングデータソースに Spark Streamingを使用して HDFSにデータを保存し、定期的に Hiveクエリバッチを実行するジョブを作成できます。

参考
  • このガイドで使用する MovieLensデータセットは、MovieLens 20M Datasetをご参照ください。
  • Data Forest > App で作成した Zeppelinノートブックでの実行をベースに説明します。

Step 1. Zeppelinアプリの作成

Zeppelinアプリを作成する方法は、次のとおりです。

  1. Zeppelinアプリを作成します。
  2. アプリタイプは ZEPPELIN-0.10.1を選択します。
    df-use-ex2_2-2_ja
  3. アプリの詳細情報の StatusStable になっているか確認します。

Step 2. データ処理

データを処理するために元のデータセットをアップロードします。実際のサービスでは Kafka、Flumeなどを通じて原本ログデータなどを HDFSに保存するか、RDBMSから Sqoopを使用して原本データを HDFSに保管できます。
MovieLensデータセットの.csvファイルを HDFSに直接アップロードする方法は、次のとおりです。

  1. HDFS NameNode UIにアクセスして Utilities > Browse the file systemメニューをクリックします。
  2. /user/${USER}ディレクトリに移動します。
    df-use-ex2_1-3_ko
  3. [Upload] アイコンをクリックします。
  4. rating.csv、movie.csvファイルをアップロードします。
    df-use-ex2_1-4_ko
参考

ファイルをクリックしてから Head the file をクリックすると、データの構成が確認できます。
df-use-ex2_1-5

1. Zeppelinアプリにアクセス

Zeppelinアプリにアクセスする方法は、次のとおりです。

  1. Zeppelinにアクセスします。

    • アクセス URLは Zeppelinアプリの Quick links > zeppelin を参照
    • Zeppelinアプリの使用に関する詳細は、Zeppelin を使用するを参照
      df-use-ex2_3-6_ja
  2. 右上の [login] をクリックして、アカウント作成時に設定したアカウント名とパスワードを使用してログインします。
    df-use-ex2_1-7_ko

    df-use-ex2_1-8_ko

2. Spark Jobの実行

  1. [Notebook] > Create new note をクリックして新しいノートブックを作成します。
    df-use-ex2_note_ko
  2. Default Interpreter は Sparkに指定し、Sparkコードには %sparkをつけてインタープリターを実行します。
  3. Sparkで2つの.csvファイルからデータフレームを作成し、2つのデータフレームを結合して Parquetファイルで HDFSに保存します。
  4. movie.csv、rating.csvをそのまま使用しないで新しいカラムを作成して変更するなど、基本的な transformationを行います。
  • movie.csvから dataframeを作成
    df-use-ex2_1-9_ko
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection

case class Movie(movieId: Int,title: String, genres: String)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class

val movie_df_temp = spark.read
        .option("header", "true")
        .schema(movie_schema)
        .csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path. 

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val movie_df = movie_df_temp
                .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) // Explode the column genre as it contains multiple values
                .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))  // Extract released date (year) from title column
                .withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer

movie_df.createOrReplaceTempView("movie") // Create dataframe temp view 
spark.sql("select * from movie limit 3").show()"
  • rating.csvから dataframeを作成
    df-use-ex2_1-10_ko
%spark

import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp

case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
                .option("header", "true")
                .schema(rating_schema)
                .csv("hdfs://koya/user/example/rating.csv") 
                       
import org.apache.spark.sql.functions._

val rating_df_filtered = rating_df_temp.groupBy($"movieId")
    .agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie. 
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
        .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later 

rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()
注意

Default Interpreterで使用している Sparkがバージョン3.0から Self joinによって発生する曖昧な列リファレンスが含まれた場合にクエリ失敗が発生します。そのため、上記のプロセスを実行する前に必ず以下のように Zeppelin > Interpreterspark を検索して [edit] ボタンを押し、spark.sql.analyzer.failAmbiguousSelfJoin 値を false に設定します。
df-use-ex2_1-28

df-use-ex2_1-29

  1. 上で登録した viewに対して spark-sqlを実行します。
    以下のクエリは例です。
    • 作成した viewに spark-sqlを実行
      df-use-ex2_1-11_ko
  2. 2つのデータフレームから新しいデータフレームを作成します。
    • 2つの dataframeを joinして新しい dataframeを作成
      df-use-ex2_1-12_ko
  3. 作成された最終データフレームを Parquet形式で保存します。
    • year, monthをパーティションに指定
      df-use-ex2_1-13_ko
  4. 書き込みジョブが完了したことを確認します。
    • HDFSの当該パスに以下のようにパーティションが作成され、データが作成されたことが確認できます。
    • 完了したジョブは、Spark History Serverで確認できます。
      df-use-ex2_1-14_ko
  5. 進行中のジョブを確認するには、左下の [Show Incomplete Applications] ボタンをクリックします。
    df-use-ex2_1-15_ko

3. Hiveクエリの実行

JDBCインタープリターを使用する前にパスワードを設定します。Data Forestで提供する HiveServer2はアカウント名とパスワードで認証しなければアクセスできませんが、基本インタープリターにはパスワードが設定されていないため、ユーザーが直接追加する必要があります。

注意

パスワードを設定せずにクエリを実行すると、org.apache.zeppelin.interpreter.InterpreterException: Error in doAsが発生するのでご注意ください。

JDBCインタープリターのパスワードを設定する方法は、次のとおりです。

  1. Zeppelin画面右上の アカウント名 > Interpreter をクリックします。
    df-hive_12_vpc_ja
  2. JDBC interpreter を検索します。
    df-quick-start_zeppelin03_ko
  3. [edit] ボタンをクリックします。
    df-quick-start_zeppelin04_ko
  4. 以下のように Propertiesに hive.user、hive.password 項目を追加します。
    • Data Forestアカウントの作成時に設定したパスワードです。
      df-quick-start_zeppelin05-1
  5. [Save] ボタンをクリックします。
  • 変更事項が保存されたインタープリターが再起動します。

4. Hiveテーブルの作成とクエリの実行

Spark Jobで書き込みした Parquetファイルから Hiveテーブルを作成し、クエリを実行します。この Zeppelinノートブックの全体内容は、こちらでダウンロードして Importできます。

注意

Zeppelinで Hiveクエリを実行する前に必ず Zeppelinで Hiveを使用するガイド内容に合わせて設定されているか確認します。

  1. 新しいノートブックを作成するために、 [Notebook] > Create new note をクリックします。
  2. Default Interpreter は JDBCに指定し、Hiveクエリには %jdbc(hive)をつけてインタープリターを実行します。
  • Databaseの確認
    df-use-ex2_1-17_ko

    注意
    • Externalデータベースの作成時に常に LOCATIONキーワード にパスを指定します。Data Forestユーザーは、Hiveデフォルト値のデータベースにアクセスできないため、常に自分の HDFSホームディレクトリ( /user/${USER} )の下のパスを使用します。事前ジョブとしてデータベースの作成パスに必要なディレクトリを予め作成します。(次のユースケースでは、warehouseフォルダが作成されていることを前提としています)。
    • データベースとは異なり、テーブルを作成する際の LOCATIONキーワード はオプションです。
    • データベースの作成時は、必ず ${USER}__db_${ANY_NAME_YOU_WANT}形式で名前を設定してください。そうしないとエラーが発生します。
    • External Databaseの作成

df-use-ex2_1-18_ko

:::

  1. 従来あったファイルでテーブルを作成するので、LOCATIONの後ろにパスとして Parquetファイルが保存されたディレクトリを設定します。
    • Parquetファイルからテーブルを作成
      df-use-ex2_1-19_ko
  2. MSCK REPAIR TABLE ${TABLENAME}でパーティションメタデータを追加します。
    • Hiveテーブルを使用する準備が完了します。
    • パーティションの追加およびメタデータのアップデート
      df-use-ex2_1-20_ko
  3. ユーザーが自由にクエリを実行します。
    • 以下のクエリは例です。
    • クエリのユースケース1(各年度に公開された映画レビュー Top3)
      df-use-ex2_1-21_ko
    • クエリのユースケース2(各年度別にユーザーに最も人気のあった映画)
      df-use-ex2_1-22_ko
  4. 実行した Hiveクエリに対するジョブの進行状況は、 Resource Manager UI で確認できます。 Log をクリックしてジョブログを確認します。
    df-use-ex2_1-23_ko

Spark、Hiveでデータ処理: Devアプリのユースケース

Devアプリを作成した後、Beeline、spark-submitのようなクライアントを使用して Spark、Hiveでデータ処理: Zeppelinアプリのユースケースのようなジョブを行う方法を説明します。
Devアプリには Data Forestで提供するすべてのサービスに対する開発環境が設定されているため、スクリプトを実行するための事前準備は別途必要ありません。Data Forestアプリではなく VPC VMでも開発環境を設定すると、本ガイドの手順を同様に行うことができます。

参考

ユーザーの必要に応じて実行方法を変えて使用できます。データを用いて複数テストを行う目的の場合には Zeppelinを、運用環境で定期的に実行するジョブの場合には実行ジョブをスクリプトにするのがより適切です。

Step 1. Devアプリの作成

Devアプリを作成する方法は、次のとおりです。

  1. Devアプリを作成します。
  2. アプリタイプは DEV-1.0.0を選択します。
    df-use-dev-ex2_2-2_ja
  3. アプリの詳細情報の StatusStable になっているか確認します。

Step 2. データ処理

データを処理するために元のデータセットをアップロードします。データセットをアップロードする方法は、Spark、Hiveでデータ処理: Zeppelin > データ処理をご参照ください。

1. Sparkクライアントの実行

Devアプリで Sparkスクリプトを正常に実行するには、まず Kerberos認証が必要です。Kerberosの認証方法は、Dev を使用するガイドをご参照ください。

注意

認証以外に別途設定は必要ありません。ただし、認証を完了していない場合は org.apache.hadoop.security.AccessControlExceptionが発生しますのでご注意ください。

認証が正常に完了すると以下のように確認できます。

$ klist 
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting       Expires              Service principal
07/14/2021 17:38:02  07/15/2021 17:38:02  krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
        renew until 07/21/2021 17:38:02

df-use-ex3_1-3_vpc

HDFSクライアントのコマンドでデータセットを確認できます。

$ hadoop fs -ls | grep .csv
-rw-------   3 example services    1493648 2021-07-12 15:25 movie.csv
-rw-r--r--   3 example services  690353377 2021-07-09 18:11 rating.csv

spark-shellを実行できます。
spark-shellでは、SparkContext、SparkSessionが既に作成されています。 Zeppelinノートパソコンで使用したコードをそのまま実行できます。

$ spark-shell

df-use-ex3_1-4_vpc

以下のコードは Zeppelinノートブックで実行したコードと同じです。:pasteで pasteモードに入り、簡単にコードが貼り付けられます。このモードを終了するには [Ctrl] + [D] キーを押します。

import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestamp
case class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._
val movie_df_temp = spark.read 
        .option("header", "true")
        .schema(movie_schema)
        .csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
                .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) 
                .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) 
                .withColumn("releasedYear", $"releasedYear".cast(IntegerType))

val rating_df_temp = spark.read
                .option("header", "true")
                .schema(rating_schema)
                .csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
    .agg(count("*").as("count")).filter($"count" > 10) 
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
        .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")

REPL形式ではなく、ソースコードを jarでビルドして提出(spark-submit)できます。このジョブには1個の argumentのみ必要です。データセットが位置した HDFSパスを指します。その後、結果ファイルもこのパスの下位に作成されます。

ソースコードは Githubで確認できます。

$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'

ジョブを提出すると、以下のように進捗状況がコンソールに出力されて確認できます。
df-use-ex3_1-5_vpc

Spark Jobを提出すると Resource Manager UI でジョブの状態と進行ログが確認できます。
df-use-ex3_1-6_vpc_ko

Spark History Server UI でジョブの実行履歴をより詳しく確認できます。
df-use-ex3_1-7_vpc_ko

ジョブが完了すると、以下のように指定した HDFSパスの下位に movie_ratingディレクトリが作成され、その下位に結果 Parquetファイルが作成されたことが確認できます。
df-use-ex3_1-8_vpc

2. Hiveクライアントの実行

Data Forestで提供する HiveServer2はアカウントとパスワード認証を使用します。そのため、Kerberos認証を行った場合でも別途認証が必要です。ここでのパスワードは、Data Forestアカウントの作成時に指定したパスワードです。

Hiveクライアントを実行する方法は、次のとおりです。

  1. Beelineクライアントを実行します。

    • パスワードに特殊文字がある場合、単一引用符(' ')で囲みます。
    $ beeline -u 
    "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
    

    df-use-ex3_1-9_vpc

  2. Externalデータベースを作成します。

    • 見やすいように、outputの形式を!set outputformat verticalに変更しました。
    • LOCATIONキーワードでパスを設定します。
    CREATE DATABASE example__db_movie_rating
    COMMENT 'MovieLens 20M Dataset'
    LOCATION '/user/example/warehouse/example__db_movie_rating'
    WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
    DESCRIBE DATABASE example__db_movie_rating;
    db_name     example__db_movie_rating
    comment     MovieLens 20M Dataset
    location    hdfs://koya/user/example/warehouse/example__db_movie_rating
    owner_name  example
    owner_type  USER
    parameters
    
  3. Spark Jobで作成した Parquetファイルからテーブルを作成します。

    USE example__db_movie_rating;
    DROP TABLE IF EXISTS movie_rating;
    CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
    partitioned by (year int, month int) stored as parquet 
    location '/user/example/movie_rating';
    MSCK REPAIR TABLE movie_rating;
    
  4. テーブルの内容を確認します。

    SHOW TABLES;
    tab_name  movie_rating
    SHOW CREATE TABLE movie_rating;
    createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
    createtab_stmt    `movieid` int,
    createtab_stmt    `title` string,
    createtab_stmt    `genre` string,
    createtab_stmt    `releasedyear` int,
    createtab_stmt    `userid` int,
    createtab_stmt    `rating` double)
    createtab_stmt  PARTITIONED BY (
    createtab_stmt    `year` int,
    createtab_stmt    `month` int)
    createtab_stmt  ROW FORMAT SERDE
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    createtab_stmt  STORED AS INPUTFORMAT
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    createtab_stmt  OUTPUTFORMAT
    createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    createtab_stmt  LOCATION
    createtab_stmt    'hdfs://koya/user/example/movie_rating'
    createtab_stmt  TBLPROPERTIES (
    createtab_stmt    'bucketing_version'='2',
    createtab_stmt    'transient_lastDdlTime'='1626253592')
    SELECT * FROM movie_rating limit 1;
    movie_rating.movieid       1176
    movie_rating.title         Double Life of Veronique, The (Double Vie de Vronique, La) (1991)
    movie_rating.genre         Romance
    movie_rating.releasedyear  1991
    movie_rating.userid        28507
    movie_rating.rating        4.0
    movie_rating.year          1995
    movie_rating.month         1
    
  5. 作成したテーブルを対象にクエリを実行します。

    • 特定年度に公開した映画の中で評点 Top3を照会するクエリです。
      SELECT releasedYear, title, avgRating, row_num 
      FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num 
      FROM movie_rating 
      GROUP BY releasedYear, title
      ) T
      WHERE row_num <= 3
      ORDER BY releasedYear DESC, row_num LIMIT 6;
      releasedyear  2015
      title         Louis C.K.: Live at The Comedy Store (2015)
      avgrating     3.8
      row_num       1
      releasedyear  2015
      title         Kingsman: The Secret Service (2015)
      avgrating     3.6320754716981134
      row_num       2
      releasedyear  2015
      title         The Second Best Exotic Marigold Hotel (2015)
      avgrating     3.5714285714285716
      row_num       3
      releasedyear  2014
      title         Zero Motivation (Efes beyahasei enosh) (2014)
      avgrating     4.5
      row_num       1
      releasedyear  2014
      title         Whiplash (2014)
      avgrating     4.074750830564784
      row_num       2
      releasedyear  2014
      title         Interstellar (2014)
      avgrating     4.023864289821737
      row_num       3
      
    • 年度別にユーザーに最も人気があった映画(評価が高かった映画)を検索するクエリです。
      SELECT year, title, popularity, row_num FROM
      (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num 
      FROM movie_rating GROUP BY year, title) T
      WHERE row_num = 1
      ORDER BY year desc, popularity desc LIMIT 5;
      year        2015
      title       Matrix, The (1999)
      popularity  1207
      row_num     1
      year        2014
      title       Shawshank Redemption, The (1994)
      popularity  2405
      row_num     1
      year        2013
      title       Shawshank Redemption, The (1994)
      popularity  2549
      row_num     1
      year        2012
      title       Inception (2010)
      popularity  2411
      row_num     1
      year        2011
      title       Inception (2010)
      popularity  3235
      row_num     1