- 인쇄
- PDF
Spark, Hive로 데이터 처리
- 인쇄
- PDF
VPC 환경에서 이용 가능합니다.
Data Forest에서 제공하는 Apache Hadoop, Apache Spark, Apache Hive를 이용하여 데이터를 처리하고 저장하는 방법을 설명합니다.
Spark, Hive로 데이터 처리: Zeppelin 앱 사용 예제
Zeppelin은 사용자가 API 코드를 쉽게 작성하고 제출할 수 있는 환경을 제공합니다. 다양한 인터프리터가 있지만 이 가이드에서는 사용 빈도가 높은 Spark와 Hive를 사용하는 방법을 예제로 설명합니다. 이 예제를 응용하면 Data Forest 플랫폼 위에서 스트리밍 데이터 소스에 Spark Streaming을 사용하여, HDFS에 데이터를 저장하고 주기적으로 Hive 쿼리 배치를 수행하는 작업을 만들 수 있습니다.
- 이 가이드에서 사용하는 MovieLens 데이터 셋은 MovieLens 20M Dataset을 참조해 주십시오.
- Data Forest > App에서 생성한 Zeppelin 노트북에서 진행하는 것을 기준으로 설명합니다.
Step 1. Zeppelin 앱 생성
Zeppelin 앱을 생성하는 방법은 다음과 같습니다.
- Zeppelin 앱을 생성해 주십시오.
- 앱 생성 방법은 앱 생성 및 관리 참조
- 앱 생성 방법은 앱 생성 및 관리 참조
- 앱 타입은 ZEPPELIN-0.10.1을 선택해 주십시오.
- 앱 상세 정보의 Status가 Stable이 되었는지 확인해 주십시오.
Step 2. 데이터 처리
데이터 처리를 위해 원본 데이터 셋을 업로드해야 합니다. 실제 서비스에서는 Kafka, Flume 등을 통해서 원본 로그 데이터 등을 HDFS에 저장하거나 RDBMS로부터 Sqoop을 사용하여 원본 데이터를 HDFS에 보관할 수 있습니다.
MovieLens 데이터셋의 .csv 파일을 HDFS에 직접 업로드하는 방법은 다음과 같습니다.
- HDFS NameNode UI에 접속해 Utilities > Browse the file system 메뉴를 클릭해 주십시오.
/user/${USER}
디렉터리로 이동해 주십시오.
- [Upload] 아이콘을 클릭해 주십시오.
- rating.csv , movie.csv 파일을 업로드해 주십시오.
파일을 클릭한 뒤 Head the file을 클릭하면 데이터가 어떻게 생겼는지 확인할 수 있습니다.
1. Zeppelin 앱 접속
Zeppelin 앱에 접속하는 방법은 다음과 같습니다.
Zeppelin에 접속해 주십시오.
- 접속 URL은 Zeppelin 앱의 Quick links > zeppelin 참조
- Zeppelin 앱 사용에 대한 자세한 내용은 Zeppelin 사용 참조
우측 상단의 [login] 을 클릭한 후, 계정 생성 시 설정한 계정 이름과 패스워드를 사용하여 로그인해 주십시오.
2. Spark Job 수행
- [Notebook] > Create new note를 클릭하여 새로운 노트북을 생성해 주십시오.
- Default Interpreter는 Spark로 지정하고 Spark 코드에는
%spark
를 붙여서 인터프리터를 실행시켜 주십시오. - Spark로 두 개의 .csv 파일로부터 데이터 프레임을 생성하고, 두 개의 데이터 프레임을 조인하여 Parquet 파일로 HDFS에 저장해 주십시오.
- movie.csv, rating.csv를 그대로 쓰지 않고, 새로운 컬럼을 만들고 변경하는 등 기본적인 transformation 작업을 수행해 주십시오.
- movie.csv로 부터 dataframe 생성
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
case class Movie(movieId: Int,title: String, genres: String)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) // Explode the column genre as it contains multiple values
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) // Extract released date (year) from title column
.withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
movie_df.createOrReplaceTempView("movie") // Create dataframe temp view
spark.sql("select * from movie limit 3").show()"
- rating.csv로 부터 dataframe 생성
%spark
import spark.implicits._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
import java.sql.Timestamp
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
import org.apache.spark.sql.functions._
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie.
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later
rating_df.createOrReplaceTempView("rating")
spark.sql("select * from rating limit 3").show()
Default Interpreter로 사용하고 있는 Spark가 버전 3.0부터 Self join으로 인해 발생하는 모호한 열 참조가 포함된 경우에 쿼리 실패가 발생합니다. 따라서 위 과정을 수행하기 전에 반드시 아래와 같이 Zeppelin > Interpreter에서 spark를 검색하고 [edit] 버튼을 눌러 spark.sql.analyzer.failAmbiguousSelfJoin 값을 false로 설정해 주십시오.
- 위에서 등록한 view에 대해서 spark-sql을 실행해 주십시오.
아래 쿼리는 예시입니다.- 생성한 view에 spark-sql 실행
- 생성한 view에 spark-sql 실행
- 두 개의 데이터 프레임으로부터 새로운 데이터 프레임을 생성해 주십시오.
- 두 개의 dataframe을 join해서 새로운 dataframe 생성
- 두 개의 dataframe을 join해서 새로운 dataframe 생성
- 만들어진 최종 데이터 프레임을 Parquet 형식으로 저장해 주십시오.
- year, month을 파티션으로 지정
- year, month을 파티션으로 지정
- 쓰기 작업이 완료되었는지 확인해 주십시오.
- HDFS 해당 경로에 아래처럼 파티션이 생성되고 데이터가 작성된 것을 확인할 수 있습니다.
- 진행이 끝난 작업은 Spark History Server에서 확인할 수 있습니다.
- 진행 중인 작업을 확인하려면 좌측 하단의 [Show Incomplete Applications] 버튼을 클릭해 주십시오.
3. Hive 쿼리 수행
JDBC 인터프리터를 사용하기 전에 비밀번호 설정이 필요합니다. Data Forest에서 제공하는 HiveServer2는 계정명과 비밀번호로 인증을 해야 접근할 수 있는데, 기본 인터프리터에는 비밀번호가 설정되어 있지 않으므로 사용자가 직접 추가해야 합니다.
비밀번호를 설정하지 않고 쿼리를 수행하면 org.apache.zeppelin.interpreter.InterpreterException: Error in doAs
가 발생하므로 주의해 주십시오.
JDBC 인터프리터의 비밀번호를 설정하는 방법은 다음과 같습니다.
- Zeppelin 화면 우측 상단의 계정명 > Interpreter를 클릭해 주십시오.
- JDBC interpreter를 검색해 주십시오.
- [edit] 버튼을 클릭해 주십시오.
- 다음과 같이 Properties에 hive.user, hive.password 항목을 추가해 주십시오.
- Data Forest 계정 생성 시 설정한 패스워드입니다.
- Data Forest 계정 생성 시 설정한 패스워드입니다.
- [Save] 버튼을 클릭해 주십시오.
- 변경사항이 저장된 인터프리터가 재시작됩니다.
4. Hive 테이블 생성 및 쿼리 실행
Spark Job에서 쓴 Parquet 파일로부터 Hive 테이블을 만들고, 쿼리를 실행해 보겠습니다. 해당 Zeppelin 노트북의 전체 내용은 여기에서 다운로드하여 Import 할 수 있습니다.
Zeppelin에서 Hive 쿼리를 실행하기 전에 반드시 Zeppelin에서 Hive 사용 가이드 내용에 맞게 설정되어 있는지 확인해 주십시오.
- 새로운 노트북을 생성하기 위해 [Notebook] > Create new note를 클릭해 주십시오.
- Default Interpreter는 JDBC로 지정하고 Hive 쿼리에는
%jdbc(hive)
를 붙여서 인터프리터를 실행시켜 주십시오.
Database 확인
주의- External 데이터베이스 생성 시 LOCATION 키워드로 경로를 항상 지정해 주십시오. Data Forest 사용자는 Hive의 기본값 데이터베이스에 접근할 수 없기 때문에, 항상 자신의 HDFS 홈 디렉터리 ( /user/${USER} ) 아래 경로를 사용해야 합니다. 사전 작업으로 데이터베이스의 생성 경로에 필요한 디렉토리가 미리 생성되어 있어야 합니다. (아래 예제에서는 warehouse폴더가 미리 생성되어 있어야 합니다.)
- 데이터베이스와 달리 테이블 생성 시 LOCATION 키워드는 옵션입니다.
- 데이터베이스 생성 시 반드시
${USER}__db_${ANY_NAME_YOU_WANT}
형식으로 이름을 설정해 주십시오. 그렇지 않을 경우 오류가 발생합니다. - External Database 생성
- 기존에 있던 파일로 테이블을 생성할 것이므로 LOCATION 뒤에 경로로 Parquet 파일이 저장된 디렉터리를 설정해 주십시오.
- Parquet 파일로부터 테이블 생성
- Parquet 파일로부터 테이블 생성
- MSCK REPAIR TABLE ${TABLENAME} 으로 파티션 메타데이터를 추가해 주십시오.
- Hive 테이블을 사용할 준비가 완료됩니다.
- 파티션 추가 및 메타데이터 업데이트
- 사용자가 원하는 대로 쿼리를 수행해 주십시오.
- 아래 쿼리는 예시입니다.
- 쿼리 예제 1(각 연도에 개봉한 영화 평점 Top3)
- 쿼리 예제 2(각 연도 별로 사용자에게 가장 인기 많았던 영화)
- 수행한 Hive 쿼리에 대한 작업 진행사항은 Resource Manager UI에서 확인할 수 있습니다. [Log] 버튼을 클릭해서 작업 로그를 확인해 주십시오.
Spark, Hive로 데이터 처리: Dev 앱 사용 예제
Dev 앱을 생성한 후 Beeline, spark-submit과 같은 클라이언트를 사용하여, Spark, Hive로 데이터 처리: Zeppelin 앱 사용 예제와 같은 작업을 수행하는 방법을 설명합니다.
Dev 앱에는 Data Forest에서 제공하는 모든 서비스에 대한 개발 환경이 구성되어 있으므로 스크립트를 실행하기 위한 사전 준비가 따로 필요하지 않습니다. Data Forest 앱이 아닌 VPC VM에서도 개발 환경을 구성하면 이 가이드의 과정을 동일하게 진행할 수 있습니다.
사용자 필요에 따라 실행 방법을 다르게 해서 사용할 수 있습니다. 데이터를 가지고 여러 테스트를 해볼 목적이라면 Zeppelin을, 운영환경에 주기적으로 실행할 작업이라면 실행 작업을 스크립트로 만드는 것이 더 적합합니다.
Step 1. Dev 앱 생성
Dev 앱을 생성하는 방법은 다음과 같습니다.
- Dev 앱을 생성해 주십시오.
- 앱 생성 방법은 앱 생성 및 관리 참조
- 앱 타입은 DEV-1.0.0을 선택해 주십시오.
- 앱 상세 정보의 Status가 Stable이 되었는지 확인해 주십시오.
Step 2. 데이터 처리
데이터 처리를 위해 원본 데이터 셋을 업로드해야 합니다. 데이터 셋을 업로드하는 방법은 Spark, Hive로 데이터 처리하기: Zeppelin > 데이터 처리를 참조해 주십시오.
1. Spark 클라이언트 실행
Dev 앱에서 Spark 스크립트를 정상적으로 수행하려면 먼저 Kerberos 인증이 필요합니다. Kerberos 인증 방법은 Dev 사용 가이드를 참조해 주십시오.
인증 이외에 별도 설정은 필요하지 않습니다. 하지만 인증을 완료하지 않을 경우 org.apache.hadoop.security.AccessControlException
이 발생하므로 주의해 주십시오.
인증이 정상적으로 완료되면 다음과 같이 확인할 수 있습니다.
$ klist
Ticket cache: FILE:/tmp/krb5cc_p26244
Default principal: example@KR.DF.NAVERNCP.COM
Valid starting Expires Service principal
07/14/2021 17:38:02 07/15/2021 17:38:02 krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
renew until 07/21/2021 17:38:02
HDFS 클라이언트 명령어로 데이터 셋을 확인할 수 있습니다.
$ hadoop fs -ls | grep .csv
-rw------- 3 example services 1493648 2021-07-12 15:25 movie.csv
-rw-r--r-- 3 example services 690353377 2021-07-09 18:11 rating.csv
spark-shell 을 실행할 수 있습니다.
spark-shell 에서는 SparkContext , SparkSession 이미 생성되어 있습니다. Zeppelin 노트북에서 사용했던 코드를 그대로 실행할 수 있습니다.
$ spark-shell
아래 코드는 Zeppelin 노트북에서 실행한 것과 동일합니다. :paste
로 paste 모드에 진입하여 코드를 쉽게 붙여넣기할 수 있습니다. 해당 모드에서 빠져 나오려면 [Ctrl]+[D] 키를 눌러 주십시오.
import org.apache.spark.sql.types.{StructType,IntegerType}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import java.sql.Timestamp
case class Movie(movieId: Int,title: String, genres: String)
case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
)
val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
val rating_schema = ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
import spark.implicits._
val movie_df_temp = spark.read
.option("header", "true")
.schema(movie_schema)
.csv("hdfs://koya/user/example/movie.csv")
val movie_df = movie_df_temp
.select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre"))
.withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))
.withColumn("releasedYear", $"releasedYear".cast(IntegerType))
val rating_df_temp = spark.read
.option("header", "true")
.schema(rating_schema)
.csv("hdfs://koya/user/example/rating.csv")
val rating_df_filtered = rating_df_temp.groupBy($"movieId")
.agg(count("*").as("count")).filter($"count" > 10)
val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))
.select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
import org.apache.spark.sql.SaveMode
movie_rating_df.repartition($"year", $"month")
.write.partitionBy("year", "month")
.mode(SaveMode.Append)
.parquet("/user/example/movie_rating")
REPL 형태가 아니라, 소스코드를 jar로 빌드하여 제출(spark-submit)할 수 있습니다. 해당 작업은 1개의 argument만 필요합니다. 데이터 셋이 위치한 HDFS 경로를 지정하면 됩니다. 이후에 결과 파일도 해당 경로 아래 생성됩니다.
소스 코드는 Github에서 확인할 수 있습니다.
$ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar
$ spark-submit --class com.dataforest.example.MovieRatingWriter \
--master yarn --deploy-mode cluster --queue longlived \
--num-executors 20 ./movie-rating-assembly-0.1.jar \
'hdfs://koya/user/example'
작업을 제출하면 아래와 같이 진행 상황이 콘솔에 출력되는 것을 확인할 수 있습니다.
Spark Job을 제출한 후에 Resource Manager UI에서 작업 상태와 진행 로그를 확인할 수 있습니다.
Spark History Server UI에서 작업의 실행 내역을 더 상세하게 확인할 수 있습니다.
작업이 완료되면 아래처럼 지정한 HDFS 경로 아래 movie_rating 디렉터리가 생기고, 그 아래로 결과 Parquet 파일이 생성되었음을 확인할 수 있습니다.
2. Hive 클라이언트 실행
Data Forest에서 제공하는 HiveServer2는 계정 및 비밀번호 인증을 사용합니다. 따라서 Kerberos 인증을 수행했더라도 별도 인증이 필요합니다. 여기서 비밀번호는 Data Forest 계정 생성 시 지정한 패스워드입니다.
Hive 클라이언트를 실행하는 방법은 다음과 같습니다.
Beeline 클라이언트를 실행해 주십시오.
- 비밀번호에 특수문자가 있을 경우 작은따옴표(' ')로 감싸줘야 합니다.
$ beeline -u "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
External 데이터베이스를 생성해 주십시오.
- 보기 쉽도록 output 형식을 !set outputformat vertical로 변경했습니다.
- LOCATION 키워드로 경로 설정해 주십시오.
CREATE DATABASE example__db_movie_rating COMMENT 'MovieLens 20M Dataset' LOCATION '/user/example/warehouse/example__db_movie_rating' WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01'); DESCRIBE DATABASE example__db_movie_rating; db_name example__db_movie_rating comment MovieLens 20M Dataset location hdfs://koya/user/example/warehouse/example__db_movie_rating owner_name example owner_type USER parameters
Spark Job에서 생성한 Parquet 파일로부터 테이블을 생성해 주십시오.
USE example__db_movie_rating; DROP TABLE IF EXISTS movie_rating; CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double) partitioned by (year int, month int) stored as parquet location '/user/example/movie_rating'; MSCK REPAIR TABLE movie_rating;
테이블 내용을 확인해 주십시오.
SHOW TABLES; tab_name movie_rating SHOW CREATE TABLE movie_rating; createtab_stmt CREATE EXTERNAL TABLE `movie_rating`( createtab_stmt `movieid` int, createtab_stmt `title` string, createtab_stmt `genre` string, createtab_stmt `releasedyear` int, createtab_stmt `userid` int, createtab_stmt `rating` double) createtab_stmt PARTITIONED BY ( createtab_stmt `year` int, createtab_stmt `month` int) createtab_stmt ROW FORMAT SERDE createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' createtab_stmt STORED AS INPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' createtab_stmt OUTPUTFORMAT createtab_stmt 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' createtab_stmt LOCATION createtab_stmt 'hdfs://koya/user/example/movie_rating' createtab_stmt TBLPROPERTIES ( createtab_stmt 'bucketing_version'='2', createtab_stmt 'transient_lastDdlTime'='1626253592') SELECT * FROM movie_rating limit 1; movie_rating.movieid 1176 movie_rating.title Double Life of Veronique, The (Double Vie de Veronique, La) (1991) movie_rating.genre Romance movie_rating.releasedyear 1991 movie_rating.userid 28507 movie_rating.rating 4.0 movie_rating.year 1995 movie_rating.month 1
생성한 테이블을 대상으로 쿼리를 수행해 주십시오.
- 특정 연도에 개봉한 영화 중에서 평점 Top3 영화를 조회하는 쿼리입니다.
SELECT releasedYear, title, avgRating, row_num FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num FROM movie_rating GROUP BY releasedYear, title ) T WHERE row_num <= 3 ORDER BY releasedYear DESC, row_num LIMIT 6; releasedyear 2015 title Louis C.K.: Live at The Comedy Store (2015) avgrating 3.8 row_num 1 releasedyear 2015 title Kingsman: The Secret Service (2015) avgrating 3.6320754716981134 row_num 2 releasedyear 2015 title The Second Best Exotic Marigold Hotel (2015) avgrating 3.5714285714285716 row_num 3 releasedyear 2014 title Zero Motivation (Efes beyahasei enosh) (2014) avgrating 4.5 row_num 1 releasedyear 2014 title Whiplash (2014) avgrating 4.074750830564784 row_num 2 releasedyear 2014 title Interstellar (2014) avgrating 4.023864289821737 row_num 3
- 연도별로 사용자에게 가장 인기 많았던 영화(평점 수가 많았던 영화)를 찾아보는 쿼리입니다.
SELECT year, title, popularity, row_num FROM (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num FROM movie_rating GROUP BY year, title) T WHERE row_num = 1 ORDER BY year desc, popularity desc LIMIT 5; year 2015 title Matrix, The (1999) popularity 1207 row_num 1 year 2014 title Shawshank Redemption, The (1994) popularity 2405 row_num 1 year 2013 title Shawshank Redemption, The (1994) popularity 2549 row_num 1 year 2012 title Inception (2010) popularity 2411 row_num 1 year 2011 title Inception (2010) popularity 3235 row_num 1
- 특정 연도에 개봉한 영화 중에서 평점 Top3 영화를 조회하는 쿼리입니다.