Spark, Hive로 데이터 처리
    • PDF

    Spark, Hive로 데이터 처리

    • PDF

    Article Summary

    VPC 환경에서 이용 가능합니다.

    Data Forest에서 제공하는 Apache Hadoop, Apache Spark, Apache Hive를 이용하여 데이터를 처리하고 저장하는 방법을 설명합니다.

    Spark, Hive로 데이터 처리: Zeppelin 앱 사용 예제

    Zeppelin은 사용자가 API 코드를 쉽게 작성하고 제출할 수 있는 환경을 제공합니다. 다양한 인터프리터가 있지만 이 가이드에서는 사용 빈도가 높은 Spark와 Hive를 사용하는 방법을 예제로 설명합니다. 이 예제를 응용하면 Data Forest 플랫폼 위에서 스트리밍 데이터 소스에 Spark Streaming을 사용하여, HDFS에 데이터를 저장하고 주기적으로 Hive 쿼리 배치를 수행하는 작업을 만들 수 있습니다.

    참고
    • 이 가이드에서 사용하는 MovieLens 데이터 셋은 MovieLens 20M Dataset을 참조해 주십시오.
    • Data Forest > App에서 생성한 Zeppelin 노트북에서 진행하는 것을 기준으로 설명합니다.

    Step 1. Zeppelin 앱 생성

    Zeppelin 앱을 생성하는 방법은 다음과 같습니다.

    1. Zeppelin 앱을 생성해 주십시오.
    2. 앱 타입은 ZEPPELIN-0.10.1을 선택해 주십시오.
      df-use-ex2_2-2_ko
    3. 앱 상세 정보의 StatusStable이 되었는지 확인해 주십시오.

    Step 2. 데이터 처리

    데이터 처리를 위해 원본 데이터 셋을 업로드해야 합니다. 실제 서비스에서는 Kafka, Flume 등을 통해서 원본 로그 데이터 등을 HDFS에 저장하거나 RDBMS로부터 Sqoop을 사용하여 원본 데이터를 HDFS에 보관할 수 있습니다.
    MovieLens 데이터셋의 .csv 파일을 HDFS에 직접 업로드하는 방법은 다음과 같습니다.

    1. HDFS NameNode UI에 접속해 Utilities > Browse the file system 메뉴를 클릭해 주십시오.
    2. /user/${USER}디렉터리로 이동해 주십시오.
      df-use-ex2_1-3_ko
    3. [Upload] 아이콘을 클릭해 주십시오.
    4. rating.csv , movie.csv 파일을 업로드해 주십시오.
      df-use-ex2_1-4_ko
    참고

    파일을 클릭한 뒤 Head the file을 클릭하면 데이터가 어떻게 생겼는지 확인할 수 있습니다.
    df-use-ex2_1-5_ko

    1. Zeppelin 앱 접속

    Zeppelin 앱에 접속하는 방법은 다음과 같습니다.

    1. Zeppelin에 접속해 주십시오.

      • 접속 URL은 Zeppelin 앱의 Quick links > zeppelin 참조
      • Zeppelin 앱 사용에 대한 자세한 내용은 Zeppelin 사용 참조
        df-use-ex2_3-6_ko
    2. 우측 상단의 [login] 을 클릭한 후, 계정 생성 시 설정한 계정 이름과 패스워드를 사용하여 로그인해 주십시오.
      df-use-ex2_1-7_ko

      df-use-ex2_1-8_ko

    2. Spark Job 수행

    1. [Notebook] > Create new note를 클릭하여 새로운 노트북을 생성해 주십시오.
      df-use-ex2_note_ko
    2. Default Interpreter는 Spark로 지정하고 Spark 코드에는 %spark를 붙여서 인터프리터를 실행시켜 주십시오.
    3. Spark로 두 개의 .csv 파일로부터 데이터 프레임을 생성하고, 두 개의 데이터 프레임을 조인하여 Parquet 파일로 HDFS에 저장해 주십시오.
    4. movie.csv , rating.csv를 그대로 쓰지 않고, 새로운 컬럼을 만들고 변경하는 등 기본적인 transformation 작업을 수행해 주십시오.
    • movie.csv로 부터 dataframe 생성
      df-use-ex2_1-9_ko
    %spark
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    
    case class Movie(movieId: Int,title: String, genres: String)
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType] // Create a schema from case class
    
    val movie_df_temp = spark.read
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv") // This is where we put our sample data. You should change the path. 
    
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.IntegerType
    
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) // Explode the column genre as it contains multiple values
                    .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1))  // Extract released date (year) from title column
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType)) // Cast string type to integer
    
    movie_df.createOrReplaceTempView("movie") // Create dataframe temp view 
    spark.sql("select * from movie limit 3").show()"
    
    • rating.csv로 부터 dataframe 생성
      df-use-ex2_1-10_ko
    %spark
    
    import spark.implicits._
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    import java.sql.Timestamp
    
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp)
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv") 
                           
    import org.apache.spark.sql.functions._
    
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) // Filter out records where only few people rate the movie. 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month")) // We will use year, month as a partiion key later 
    
    rating_df.createOrReplaceTempView("rating")
    spark.sql("select * from rating limit 3").show()
    
    주의

    Default Interpreter로 사용하고 있는 Spark가 버전 3.0부터 Self join으로 인해 발생하는 모호한 열 참조가 포함된 경우에 쿼리 실패가 발생합니다. 따라서 위 과정을 수행하기 전에 반드시 아래와 같이 Zeppelin > Interpreter에서 spark를 검색하고 [edit] 버튼을 눌러 spark.sql.analyzer.failAmbiguousSelfJoin 값을 false로 설정해 주십시오.
    df-use-ex2_1-28

    df-use-ex2_1-29

    1. 위에서 등록한 view에 대해서 spark-sql을 실행해 주십시오.
      아래 쿼리는 예시입니다.
      • 생성한 view에 spark-sql 실행
        df-use-ex2_1-11_ko
    2. 두 개의 데이터 프레임으로부터 새로운 데이터 프레임을 생성해 주십시오.
      • 두 개의 dataframe을 join해서 새로운 dataframe 생성
        df-use-ex2_1-12_ko
    3. 만들어진 최종 데이터 프레임을 Parquet 형식으로 저장해 주십시오.
      • year, month을 파티션으로 지정
        df-use-ex2_1-13_ko
    4. 쓰기 작업이 완료되었는지 확인해 주십시오.
      • HDFS 해당 경로에 아래처럼 파티션이 생성되고 데이터가 작성된 것을 확인할 수 있습니다.
      • 진행이 끝난 작업은 Spark History Server에서 확인할 수 있습니다.
        df-use-ex2_1-14_ko
    5. 진행 중인 작업을 확인하려면 좌측 하단의 [Show Incomplete Applications] 버튼을 클릭해 주십시오.
      df-use-ex2_1-15_ko

    3. Hive 쿼리 수행

    JDBC 인터프리터를 사용하기 전에 비밀번호 설정이 필요합니다. Data Forest에서 제공하는 HiveServer2는 계정명과 비밀번호로 인증을 해야 접근할 수 있는데, 기본 인터프리터에는 비밀번호가 설정되어 있지 않으므로 사용자가 직접 추가해야 합니다.

    주의

    비밀번호를 설정하지 않고 쿼리를 수행하면 org.apache.zeppelin.interpreter.InterpreterException: Error in doAs가 발생하므로 주의해 주십시오.

    JDBC 인터프리터의 비밀번호를 설정하는 방법은 다음과 같습니다.

    1. Zeppelin 화면 우측 상단의 계정명 > Interpreter를 클릭해 주십시오.
      df-hive_12_vpc_ko
    2. JDBC interpreter를 검색해 주십시오.
      df-quick-start_zeppelin03_ko
    3. [edit] 버튼을 클릭해 주십시오.
      df-quick-start_zeppelin04_ko
    4. 다음과 같이 Properties에 hive.user, hive.password 항목을 추가해 주십시오.
      • Data Forest 계정 생성 시 설정한 패스워드입니다.
        df-quick-start_zeppelin05-1
    5. [Save] 버튼을 클릭해 주십시오.
      • 변경사항이 저장된 인터프리터가 재시작됩니다.

    4. Hive 테이블 생성 및 쿼리 실행

    Spark Job에서 쓴 Parquet 파일로부터 Hive 테이블을 만들고, 쿼리를 실행해 보겠습니다. 해당 Zeppelin 노트북의 전체 내용은 여기에서 다운로드하여 Import 할 수 있습니다.

    주의

    Zeppelin에서 Hive 쿼리를 실행하기 전에 반드시 Zeppelin에서 Hive 사용 가이드 내용에 맞게 설정되어 있는지 확인해 주십시오.

    1. 새로운 노트북을 생성하기 위해 [Notebook] > Create new note를 클릭해 주십시오.
    2. Default Interpreter는 JDBC로 지정하고 Hive 쿼리에는 %jdbc(hive)를 붙여서 인터프리터를 실행시켜 주십시오.
    • Database 확인
      df-use-ex2_1-17_ko

      주의
      • External 데이터베이스 생성 시 LOCATION 키워드로 경로를 항상 지정해 주십시오. Data Forest 사용자는 Hive의 기본값 데이터베이스에 접근할 수 없기 때문에, 항상 자신의 HDFS 홈 디렉터리 ( /user/${USER} ) 아래 경로를 사용해야 합니다. 사전 작업으로 데이터베이스의 생성 경로에 필요한 디렉토리가 미리 생성되어 있어야 합니다. (아래 예제에서는 warehouse폴더가 미리 생성되어 있어야 합니다.)
      • 데이터베이스와 달리 테이블 생성 시 LOCATION 키워드는 옵션입니다.
      • 데이터베이스 생성 시 반드시 ${USER}__db_${ANY_NAME_YOU_WANT} 형식으로 이름을 설정해 주십시오. 그렇지 않을 경우 오류가 발생합니다.
      • External Database 생성
        df-use-ex2_1-18_ko
    1. 기존에 있던 파일로 테이블을 생성할 것이므로 LOCATION 뒤에 경로로 Parquet 파일이 저장된 디렉터리를 설정해 주십시오.
      • Parquet 파일로부터 테이블 생성
        df-use-ex2_1-19_ko
    2. MSCK REPAIR TABLE ${TABLENAME} 으로 파티션 메타데이터를 추가해 주십시오.
      • Hive 테이블을 사용할 준비가 완료됩니다.
      • 파티션 추가 및 메타데이터 업데이트
        df-use-ex2_1-20_ko
    3. 사용자가 원하는 대로 쿼리를 수행해 주십시오.
      • 아래 쿼리는 예시입니다.
      • 쿼리 예제 1(각 연도에 개봉한 영화 평점 Top3)
        df-use-ex2_1-21_ko
      • 쿼리 예제 2(각 연도 별로 사용자에게 가장 인기 많았던 영화)
        df-use-ex2_1-22_ko
    4. 수행한 Hive 쿼리에 대한 작업 진행사항은 Resource Manager UI에서 확인할 수 있습니다. [Log] 버튼을 클릭해서 작업 로그를 확인해 주십시오.
      df-use-ex2_1-23_ko

    Spark, Hive로 데이터 처리: Dev 앱 사용 예제

    Dev 앱을 생성한 후 Beeline, spark-submit과 같은 클라이언트를 사용하여, Spark, Hive로 데이터 처리: Zeppelin 앱 사용 예제와 같은 작업을 수행하는 방법을 설명합니다.
    Dev 앱에는 Data Forest에서 제공하는 모든 서비스에 대한 개발 환경이 구성되어 있으므로 스크립트를 실행하기 위한 사전 준비가 따로 필요하지 않습니다. Data Forest 앱이 아닌 VPC VM에서도 개발 환경을 구성하면 이 가이드의 과정을 동일하게 진행할 수 있습니다.

    참고

    사용자 필요에 따라 실행 방법을 다르게 해서 사용할 수 있습니다. 데이터를 가지고 여러 테스트를 해볼 목적이라면 Zeppelin을, 운영환경에 주기적으로 실행할 작업이라면 실행 작업을 스크립트로 만드는 것이 더 적합합니다.

    Step 1. Dev 앱 생성

    Dev 앱을 생성하는 방법은 다음과 같습니다.

    1. Dev 앱을 생성해 주십시오.
    2. 앱 타입은 DEV-1.0.0을 선택해 주십시오.
      df-use-dev-ex2_2-2_ko
    3. 앱 상세 정보의 StatusStable이 되었는지 확인해 주십시오.

    Step 2. 데이터 처리

    데이터 처리를 위해 원본 데이터 셋을 업로드해야 합니다. 데이터 셋을 업로드하는 방법은 Spark, Hive로 데이터 처리하기: Zeppelin > 데이터 처리를 참조해 주십시오.

    1. Spark 클라이언트 실행

    Dev 앱에서 Spark 스크립트를 정상적으로 수행하려면 먼저 Kerberos 인증이 필요합니다. Kerberos 인증 방법은 Dev 사용 가이드를 참조해 주십시오.

    주의

    인증 이외에 별도 설정은 필요하지 않습니다. 하지만 인증을 완료하지 않을 경우 org.apache.hadoop.security.AccessControlException이 발생하므로 주의해 주십시오.

    인증이 정상적으로 완료되면 다음과 같이 확인할 수 있습니다.

    $ klist 
    Ticket cache: FILE:/tmp/krb5cc_p26244
    Default principal: example@KR.DF.NAVERNCP.COM
    Valid starting       Expires              Service principal
    07/14/2021 17:38:02  07/15/2021 17:38:02  krbtgt/KR.DF.BETA.NAVERNCP.COM@KR.DF.NAVERNCP.COM
            renew until 07/21/2021 17:38:02
    

    df-use-ex3_1-3_vpc_ko

    HDFS 클라이언트 명령어로 데이터 셋을 확인할 수 있습니다.

    $ hadoop fs -ls | grep .csv
    -rw-------   3 example services    1493648 2021-07-12 15:25 movie.csv
    -rw-r--r--   3 example services  690353377 2021-07-09 18:11 rating.csv
    

    spark-shell 을 실행할 수 있습니다.
    spark-shell 에서는 SparkContext , SparkSession 이미 생성되어 있습니다. Zeppelin 노트북에서 사용했던 코드를 그대로 실행할 수 있습니다.

    $ spark-shell
    

    df-use-ex3_1-4_vpc_ko

    아래 코드는 Zeppelin 노트북에서 실행한 것과 동일합니다. :paste로 paste 모드에 진입하여 코드를 쉽게 붙여넣기할 수 있습니다. 해당 모드에서 빠져 나오려면 [Ctrl]+[D] 키를 눌러 주십시오.

    import org.apache.spark.sql.types.{StructType,IntegerType}
    import org.apache.spark.sql.catalyst.ScalaReflection
    import org.apache.spark.sql.functions._
    import java.sql.Timestamp
    case class Movie(movieId: Int,title: String, genres: String)
    case class Rating(userId: Int, movieId: Int, rating: Double, timestamp: Timestamp
    )
    val movie_schema = ScalaReflection.schemaFor[Movie].dataType.asInstanceOf[StructType]
    val rating_schema =  ScalaReflection.schemaFor[Rating].dataType.asInstanceOf[StructType]
    import spark.implicits._
    val movie_df_temp = spark.read 
            .option("header", "true")
            .schema(movie_schema)
            .csv("hdfs://koya/user/example/movie.csv")
    val movie_df = movie_df_temp
                    .select($"movieId", $"title", explode(split($"genres", "\\|")).as("genre")) 
                    .withColumn("releasedYear", regexp_extract($"title","\\((\\d{4})\\)", 1)) 
                    .withColumn("releasedYear", $"releasedYear".cast(IntegerType))
    
    val rating_df_temp = spark.read
                    .option("header", "true")
                    .schema(rating_schema)
                    .csv("hdfs://koya/user/example/rating.csv")
    val rating_df_filtered = rating_df_temp.groupBy($"movieId")
        .agg(count("*").as("count")).filter($"count" > 10) 
    val rating_df = rating_df_temp.join(rating_df_filtered, rating_df_temp("movieId") === rating_df_filtered("movieId"))  
            .select($"userId", rating_df_temp("movieId"), $"rating", year($"timestamp").as("year"), month($"timestamp").as("month"))
    val movie_rating_df = movie_df.join(rating_df, Seq("movieId"), "inner")
    import org.apache.spark.sql.SaveMode
    movie_rating_df.repartition($"year", $"month")
    .write.partitionBy("year", "month")
    .mode(SaveMode.Append)
    .parquet("/user/example/movie_rating")
    

    REPL 형태가 아니라, 소스코드를 jar로 빌드하여 제출(spark-submit)할 수 있습니다. 해당 작업은 1개의 argument만 필요합니다. 데이터 셋이 위치한 HDFS 경로를 지정하면 됩니다. 이후에 결과 파일도 해당 경로 아래 생성됩니다.

    소스 코드는 Github에서 확인할 수 있습니다.

    $ wget http://dist.kr.df.naverncp.com/repos/release/df-env/media/movie-rating-assembly-0.1.jar 
    $ spark-submit --class com.dataforest.example.MovieRatingWriter \
    --master yarn --deploy-mode cluster --queue longlived \
    --num-executors 20 ./movie-rating-assembly-0.1.jar \
    'hdfs://koya/user/example'
    

    작업을 제출하면 아래와 같이 진행 상황이 콘솔에 출력되는 것을 확인할 수 있습니다.
    df-use-ex3_1-5_vpc_ko

    Spark Job을 제출한 후에 Resource Manager UI에서 작업 상태와 진행 로그를 확인할 수 있습니다.
    df-use-ex3_1-6_vpc_ko

    Spark History Server UI에서 작업의 실행 내역을 더 상세하게 확인할 수 있습니다.
    df-use-ex3_1-7_vpc_ko

    작업이 완료되면 아래처럼 지정한 HDFS 경로 아래 movie_rating 디렉터리가 생기고, 그 아래로 결과 Parquet 파일이 생성되었음을 확인할 수 있습니다.
    df-use-ex3_1-8_vpc_ko

    2. Hive 클라이언트 실행

    Data Forest에서 제공하는 HiveServer2는 계정 및 비밀번호 인증을 사용합니다. 따라서 Kerberos 인증을 수행했더라도 별도 인증이 필요합니다. 여기서 비밀번호는 Data Forest 계정 생성 시 지정한 패스워드입니다.

    Hive 클라이언트를 실행하는 방법은 다음과 같습니다.

    1. Beeline 클라이언트를 실행해 주십시오.

      • 비밀번호에 특수문자가 있을 경우 작은따옴표(' ')로 감싸줘야 합니다.
      $ beeline -u 
      "jdbc:hive2://zk1.kr.df.naverncp.com:2181,zk2.kr.df.naverncp.com:2181,zk3.kr.df.naverncp.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n ${USERNAME} -p ${PASSWORD}
      

      df-use-ex3_1-9_vpc_ko

    2. External 데이터베이스를 생성해 주십시오.

      • 보기 쉽도록 output 형식을 !set outputformat vertical로 변경했습니다.
      • LOCATION 키워드로 경로 설정해 주십시오.
      CREATE DATABASE example__db_movie_rating
      COMMENT 'MovieLens 20M Dataset'
      LOCATION '/user/example/warehouse/example__db_movie_rating'
      WITH DBPROPERTIES ('creater'='suewoon', 'date'='2021-07-01');
      DESCRIBE DATABASE example__db_movie_rating;
      db_name     example__db_movie_rating
      comment     MovieLens 20M Dataset
      location    hdfs://koya/user/example/warehouse/example__db_movie_rating
      owner_name  example
      owner_type  USER
      parameters
      
    3. Spark Job에서 생성한 Parquet 파일로부터 테이블을 생성해 주십시오.

      USE example__db_movie_rating;
      DROP TABLE IF EXISTS movie_rating;
      CREATE EXTERNAL TABLE movie_rating (movieId int, title string, genre string, releasedYear int, userId int, rating double)
      partitioned by (year int, month int) stored as parquet 
      location '/user/example/movie_rating';
      MSCK REPAIR TABLE movie_rating;
      
    4. 테이블 내용을 확인해 주십시오.

      SHOW TABLES;
      tab_name  movie_rating
      SHOW CREATE TABLE movie_rating;
      createtab_stmt  CREATE EXTERNAL TABLE `movie_rating`(
      createtab_stmt    `movieid` int,
      createtab_stmt    `title` string,
      createtab_stmt    `genre` string,
      createtab_stmt    `releasedyear` int,
      createtab_stmt    `userid` int,
      createtab_stmt    `rating` double)
      createtab_stmt  PARTITIONED BY (
      createtab_stmt    `year` int,
      createtab_stmt    `month` int)
      createtab_stmt  ROW FORMAT SERDE
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
      createtab_stmt  STORED AS INPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
      createtab_stmt  OUTPUTFORMAT
      createtab_stmt    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
      createtab_stmt  LOCATION
      createtab_stmt    'hdfs://koya/user/example/movie_rating'
      createtab_stmt  TBLPROPERTIES (
      createtab_stmt    'bucketing_version'='2',
      createtab_stmt    'transient_lastDdlTime'='1626253592')
      SELECT * FROM movie_rating limit 1;
      movie_rating.movieid       1176
      movie_rating.title         Double Life of Veronique, The (Double Vie de Veronique, La) (1991)
      movie_rating.genre         Romance
      movie_rating.releasedyear  1991
      movie_rating.userid        28507
      movie_rating.rating        4.0
      movie_rating.year          1995
      movie_rating.month         1
      
    5. 생성한 테이블을 대상으로 쿼리를 수행해 주십시오.

      • 특정 연도에 개봉한 영화 중에서 평점 Top3 영화를 조회하는 쿼리입니다.
        SELECT releasedYear, title, avgRating, row_num 
        FROM (SELECT releasedYear, title, avg(rating) as avgRating , rank() over (partition by releasedYear order by avg(rating) desc) as row_num 
        FROM movie_rating 
        GROUP BY releasedYear, title
        ) T
        WHERE row_num <= 3
        ORDER BY releasedYear DESC, row_num LIMIT 6;
        releasedyear  2015
        title         Louis C.K.: Live at The Comedy Store (2015)
        avgrating     3.8
        row_num       1
        releasedyear  2015
        title         Kingsman: The Secret Service (2015)
        avgrating     3.6320754716981134
        row_num       2
        releasedyear  2015
        title         The Second Best Exotic Marigold Hotel (2015)
        avgrating     3.5714285714285716
        row_num       3
        releasedyear  2014
        title         Zero Motivation (Efes beyahasei enosh) (2014)
        avgrating     4.5
        row_num       1
        releasedyear  2014
        title         Whiplash (2014)
        avgrating     4.074750830564784
        row_num       2
        releasedyear  2014
        title         Interstellar (2014)
        avgrating     4.023864289821737
        row_num       3
        
      • 연도별로 사용자에게 가장 인기 많았던 영화(평점 수가 많았던 영화)를 찾아보는 쿼리입니다.
        SELECT year, title, popularity, row_num FROM
        (SELECT year, title, count(distinct userId) as popularity, rank() over (partition by year order by count(distinct userId) desc) as row_num 
        FROM movie_rating GROUP BY year, title) T
        WHERE row_num = 1
        ORDER BY year desc, popularity desc LIMIT 5;
        year        2015
        title       Matrix, The (1999)
        popularity  1207
        row_num     1
        year        2014
        title       Shawshank Redemption, The (1994)
        popularity  2405
        row_num     1
        year        2013
        title       Shawshank Redemption, The (1994)
        popularity  2549
        row_num     1
        year        2012
        title       Inception (2010)
        popularity  2411
        row_num     1
        year        2011
        title       Inception (2010)
        popularity  3235
        row_num     1
        

    이 문서가 도움이 되었습니까?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.