- 印刷する
- PDF
Spark、Hiveでデータを処理
- 印刷する
- PDF
VPC環境で利用できます。
Data Forestで提供する Apache Hadoop、Apache Spark、Apache Hiveを利用してデータを処理・保存する方法を説明します。
Spark、Hiveでデータ処理: Zeppelinアプリのユースケース
Zeppelinは、ユーザーが簡単に APIコードを作成して提出する環境を提供します。様々なインタープリターがあるが、本ガイドでは最も使用頻度の高い Sparkと Hiveを使用する方法をユースケースで説明します。このユースケースを応用すると、Data Forestプラットフォーム上でストリーミングデータソースに Spark Streamingを使用して HDFSにデータを保存し、定期的に Hiveクエリバッチを実行するジョブを作成できます。
- このガイドで使用する MovieLensデータセットは、MovieLens 20M Datasetをご参照ください。
- Data Forest > App で作成した Zeppelinノートブックでの実行をベースに説明します。
Step 1. Zeppelinアプリの作成
Zeppelinアプリを作成する方法は、次のとおりです。
- Zeppelinアプリを作成します。
- アプリの作成方法は、アプリの作成と管理を参照
- アプリの作成方法は、アプリの作成と管理を参照
- アプリタイプは ZEPPELIN-0.10.1を選択します。
- アプリの詳細情報の Status が Stable になっているか確認します。
Step 2. データ処理
データを処理するために元のデータセットをアップロードします。実際のサービスでは Kafka、Flumeなどを通じて原本ログデータなどを HDFSに保存するか、RDBMSから Sqoopを使用して原本データを HDFSに保管できます。
MovieLensデータセットの.csvファイルを HDFSに直接アップロードする方法は、次のとおりです。
- HDFS NameNode UIにアクセスして Utilities > Browse the file systemメニューをクリックします。
/user/${USER}
ディレクトリに移動します。
- [Upload] アイコンをクリックします。
- rating.csv、movie.csvファイルをアップロードします。
ファイルをクリックしてから Head the file をクリックすると、データの構成が確認できます。
1. Zeppelinアプリにアクセス
Zeppelinアプリにアクセスする方法は、次のとおりです。
Zeppelinにアクセスします。
- アクセス URLは Zeppelinアプリの Quick links > zeppelin を参照
- Zeppelinアプリの使用に関する詳細は、Zeppelin を使用するを参照
右上の [login] をクリックして、アカウント作成時に設定したアカウント名とパスワードを使用してログインします。
2. Spark Jobの実行
- [Notebook] > Create new note をクリックして新しいノートブックを作成します。
- Default Interpreter は Sparkに指定し、Sparkコードには
%spark
をつけてインタープリターを実行します。 - Sparkで2つの.csvファイルからデータフレームを作成し、2つのデータフレームを結合して Parquetファイルで HDFSに保存します。
- movie.csv、rating.csvをそのまま使用しないで新しいカラムを作成して変更するなど、基本的な transformationを行います。
- movie.csvから dataframeを作成
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
case class Movie(movieId: Int,title: String, genres: String)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) // Explode the column genre as it contains multiple values
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) // Extract released date (year) from title column
.withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
movie_df.createOrReplaceTempView("movie") // Create dataframe temp view
spark.sql("select * from movie limit 3").show()"
- rating.csvから dataframeを作成
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
import org.apache.spark.sql.functions._
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie.
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later
rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()
Default Interpreterで使用している Sparkがバージョン3.0から Self joinによって発生する曖昧な列リファレンスが含まれた場合にクエリ失敗が発生します。そのため、上記のプロセスを実行する前に必ず以下のように Zeppelin > Interpreter で spark を検索して [edit] ボタンを押し、spark.sql.analyzer.failAmbiguousSelfJoin 値を false に設定します。
- 上で登録した viewに対して spark-sqlを実行します。
以下のクエリは例です。- 作成した viewに spark-sqlを実行
- 作成した viewに spark-sqlを実行
- 2つのデータフレームから新しいデータフレームを作成します。
- 2つの dataframeを joinして新しい dataframeを作成
- 2つの dataframeを joinして新しい dataframeを作成
- 作成された最終データフレームを Parquet形式で保存します。
- year, monthをパーティションに指定
- year, monthをパーティションに指定
- 書き込みジョブが完了したことを確認します。
- HDFSの当該パスに以下のようにパーティションが作成され、データが作成されたことが確認できます。
- 完了したジョブは、Spark History Serverで確認できます。
- 進行中のジョブを確認するには、左下の [Show Incomplete Applications] ボタンをクリックします。
3. Hiveクエリの実行
JDBCインタープリターを使用する前にパスワードを設定します。Data Forestで提供する HiveServer2はアカウント名とパスワードで認証しなければアクセスできませんが、基本インタープリターにはパスワードが設定されていないため、ユーザーが直接追加する必要があります。
パスワードを設定せずにクエリを実行すると、org.apache.zeppelin.interpreter.InterpreterException: Error in doAs
が発生するのでご注意ください。
JDBCインタープリターのパスワードを設定する方法は、次のとおりです。
- Zeppelin画面右上の アカウント名 > Interpreter をクリックします。
- JDBC interpreter を検索します。
- [edit] ボタンをクリックします。
- 以下のように Propertiesに hive.user、hive.password 項目を追加します。
- Data Forestアカウントの作成時に設定したパスワードです。
- Data Forestアカウントの作成時に設定したパスワードです。
- [Save] ボタンをクリックします。
- 変更事項が保存されたインタープリターが再起動します。
4. Hiveテーブルの作成とクエリの実行
Spark Jobで書き込みした Parquetファイルから Hiveテーブルを作成し、クエリを実行します。この Zeppelinノートブックの全体内容は、こちらでダウンロードして Importできます。
Zeppelinで Hiveクエリを実行する前に必ず Zeppelinで Hiveを使用するガイド内容に合わせて設定されているか確認します。
- 新しいノートブックを作成するために、 [Notebook] > Create new note をクリックします。
- Default Interpreter は JDBCに指定し、Hiveクエリには
%jdbc(hive)
をつけてインタープリターを実行します。
Databaseの確認
注意- Externalデータベースの作成時に常に LOCATIONキーワード にパスを指定します。Data Forestユーザーは、Hiveデフォルト値のデータベースにアクセスできないため、常に自分の HDFSホームディレクトリ( /user/${USER} )の下のパスを使用します。事前ジョブとしてデータベースの作成パスに必要なディレクトリを予め作成します。(次のユースケースでは、warehouseフォルダが作成されていることを前提としています)。
- データベースとは異なり、テーブルを作成する際の LOCATIONキーワード はオプションです。
- データベースの作成時は、必ず
${USER}__db_${ANY_NAME_YOU_WANT}
形式で名前を設定してください。そうしないとエラーが発生します。 - External Databaseの作成
:::
- 従来あったファイルでテーブルを作成するので、LOCATIONの後ろにパスとして Parquetファイルが保存されたディレクトリを設定します。
- Parquetファイルからテーブルを作成
- Parquetファイルからテーブルを作成
- MSCK REPAIR TABLE ${TABLENAME}でパーティションメタデータを追加します。
- Hiveテーブルを使用する準備が完了します。
- パーティションの追加およびメタデータのアップデート
- ユーザーが自由にクエリを実行します。
- 以下のクエリは例です。
- クエリのユースケース1(各年度に公開された映画レビュー Top3)
- クエリのユースケース2(各年度別にユーザーに最も人気のあった映画)
- 実行した Hiveクエリに対するジョブの進行状況は、 Resource Manager UI で確認できます。 Log をクリックしてジョブログを確認します。
Spark、Hiveでデータ処理: Devアプリのユースケース
Devアプリを作成した後、Beeline、spark-submitのようなクライアントを使用して Spark、Hiveでデータ処理: Zeppelinアプリのユースケースのようなジョブを行う方法を説明します。
Devアプリには Data Forestで提供するすべてのサービスに対する開発環境が設定されているため、スクリプトを実行するための事前準備は別途必要ありません。Data Forestアプリではなく VPC VMでも開発環境を設定すると、本ガイドの手順を同様に行うことができます。
ユーザーの必要に応じて実行方法を変えて使用できます。データを用いて複数テストを行う目的の場合には Zeppelinを、運用環境で定期的に実行するジョブの場合には実行ジョブをスクリプトにするのがより適切です。
Step 1. Devアプリの作成
Devアプリを作成する方法は、次のとおりです。
- Devアプリを作成します。
- アプリの作成方法は、アプリの作成と管理を参照
- アプリタイプは DEV-1.0.0を選択します。
- アプリの詳細情報の Status が Stable になっているか確認します。
Step 2. データ処理
データを処理するために元のデータセットをアップロードします。データセットをアップロードする方法は、Spark、Hiveでデータ処理: Zeppelin > データ処理をご参照ください。
1. Sparkクライアントの実行
Devアプリで Sparkスクリプトを正常に実行するには、まず Kerberos認証が必要です。Kerberosの認証方法は、Dev を使用するガイドをご参照ください。
認証以外に別途設定は必要ありません。ただし、認証を完了していない場合は org.apache.hadoop.security.AccessControlException
が発生しますのでご注意ください。
認証が正常に完了すると以下のように確認できます。
$ klist
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting Expires Service principal
07/14/2021 17:38:02 07/15/2021 17:38:02 krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
renew until 07/21/2021 17:38:02
HDFSクライアントのコマンドでデータセットを確認できます。
$ hadoop fs -ls | grep .csv
-rw------- 3 example services 1493648 2021-07-12 15:25 movie.csv
-rw-r--r-- 3 example services 690353377 2021-07-09 18:11 rating.csv
spark-shellを実行できます。
spark-shellでは、SparkContext、SparkSessionが既に作成されています。 Zeppelinノートパソコンで使用したコードをそのまま実行できます。
$ spark-shell
以下のコードは Zeppelinノートブックで実行したコードと同じです。:paste
で pasteモードに入り、簡単にコードが貼り付けられます。このモードを終了するには [Ctrl] + [D] キーを押します。
import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestamp
case class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))
.withColumn("releasedYear", $"releasedYear".cast(IntegerType))
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10)
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")
REPL形式ではなく、ソースコードを jarでビルドして提出(spark-submit)できます。このジョブには1個の argumentのみ必要です。データセットが位置した HDFSパスを指します。その後、結果ファイルもこのパスの下位に作成されます。
ソースコードは Githubで確認できます。
$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'
ジョブを提出すると、以下のように進捗状況がコンソールに出力されて確認できます。
Spark Jobを提出すると Resource Manager UI でジョブの状態と進行ログが確認できます。
Spark History Server UI でジョブの実行履歴をより詳しく確認できます。
ジョブが完了すると、以下のように指定した HDFSパスの下位に movie_ratingディレクトリが作成され、その下位に結果 Parquetファイルが作成されたことが確認できます。
2. Hiveクライアントの実行
Data Forestで提供する HiveServer2はアカウントとパスワード認証を使用します。そのため、Kerberos認証を行った場合でも別途認証が必要です。ここでのパスワードは、Data Forestアカウントの作成時に指定したパスワードです。
Hiveクライアントを実行する方法は、次のとおりです。
Beelineクライアントを実行します。
- パスワードに特殊文字がある場合、単一引用符(' ')で囲みます。
$ beeline -u "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
Externalデータベースを作成します。
- 見やすいように、outputの形式を!set outputformat verticalに変更しました。
- LOCATIONキーワードでパスを設定します。
CREATE DATABASE example__db_movie_rating COMMENT 'MovieLens 20M Dataset' LOCATION '/user/example/warehouse/example__db_movie_rating' WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01'); DESCRIBE DATABASE example__db_movie_rating; db_name example__db_movie_rating comment MovieLens 20M Dataset location hdfs://koya/user/example/warehouse/example__db_movie_rating owner_name example owner_type USER parameters
Spark Jobで作成した Parquetファイルからテーブルを作成します。
USE example__db_movie_rating; DROP TABLE IF EXISTS movie_rating; CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double) partitioned by (year int, month int) stored as parquet location '/user/example/movie_rating'; MSCK REPAIR TABLE movie_rating;
テーブルの内容を確認します。
SHOW TABLES; tab_name movie_rating SHOW CREATE TABLE movie_rating; createtab_stmt CREATE EXTERNAL TABLE `movie_rating`( createtab_stmt `movieid` int, createtab_stmt `title` string, createtab_stmt `genre` string, createtab_stmt `releasedyear` int, createtab_stmt `userid` int, createtab_stmt `rating` double) createtab_stmt PARTITIONED BY ( createtab_stmt `year` int, createtab_stmt `month` int) createtab_stmt ROW FORMAT SERDE createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' createtab_stmt STORED AS INPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' createtab_stmt OUTPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' createtab_stmt LOCATION createtab_stmt 'hdfs://koya/user/example/movie_rating' createtab_stmt TBLPROPERTIES ( createtab_stmt 'bucketing_version'='2', createtab_stmt 'transient_lastDdlTime'='1626253592') SELECT * FROM movie_rating limit 1; movie_rating.movieid 1176 movie_rating.title Double Life of Veronique, The (Double Vie de Vronique, La) (1991) movie_rating.genre Romance movie_rating.releasedyear 1991 movie_rating.userid 28507 movie_rating.rating 4.0 movie_rating.year 1995 movie_rating.month 1
作成したテーブルを対象にクエリを実行します。
- 特定年度に公開した映画の中で評点 Top3を照会するクエリです。
SELECT releasedYear, title, avgRating, row_num FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num FROM movie_rating GROUP BY releasedYear, title ) T WHERE row_num <= 3 ORDER BY releasedYear DESC, row_num LIMIT 6; releasedyear 2015 title Louis C.K.: Live at The Comedy Store (2015) avgrating 3.8 row_num 1 releasedyear 2015 title Kingsman: The Secret Service (2015) avgrating 3.6320754716981134 row_num 2 releasedyear 2015 title The Second Best Exotic Marigold Hotel (2015) avgrating 3.5714285714285716 row_num 3 releasedyear 2014 title Zero Motivation (Efes beyahasei enosh) (2014) avgrating 4.5 row_num 1 releasedyear 2014 title Whiplash (2014) avgrating 4.074750830564784 row_num 2 releasedyear 2014 title Interstellar (2014) avgrating 4.023864289821737 row_num 3
- 年度別にユーザーに最も人気があった映画(評価が高かった映画)を検索するクエリです。
SELECT year, title, popularity, row_num FROM (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num FROM movie_rating GROUP BY year, title) T WHERE row_num = 1 ORDER BY year desc, popularity desc LIMIT 5; year 2015 title Matrix, The (1999) popularity 1207 row_num 1 year 2014 title Shawshank Redemption, The (1994) popularity 2405 row_num 1 year 2013 title Shawshank Redemption, The (1994) popularity 2549 row_num 1 year 2012 title Inception (2010) popularity 2411 row_num 1 year 2011 title Inception (2010) popularity 3235 row_num 1
- 特定年度に公開した映画の中で評点 Top3を照会するクエリです。