Cloud Hadoop Sparkでの Icebergテーブル管理

Prev Next

VPC環境で利用できます。

Cloud Hadoopサービスの Sparkで Icebergテーブルを管理する方法を紹介します。

Spark with Cloud Hadoopを使用して Data Catalogの icebergテーブルと連携するシナリオは、次の通りです。

  1. Data Catalogで iceberg tableを作成します。
  2. sparkでソースデータ(cargo flight schedule summer arrival.csv)を iceberg tableに追加します。
  3. sparkで他のソースデータ(cargo flight schedule winter arrival.csv)を同じ iceberg tableに追加します。
  4. iceberg snapshotによる照会結果を比較します。
  5. sparkで icebergファイルの Compactionを実行します。
参考

このガイドでは Apache Sparkを実行する様々な方法のうち、Pythonコードを PySpark環境で実行する方法を基準にご案内します。

事前タスク

Spark with Cloud Hadoopを使用して Data Catalogの icebergテーブルと連携するために準備すべきタスクを説明します。

Cloud Hadoop設定

Cloud Hadoopを設定し、Cloud Hadoop spark3をインストールする方法は、次の通りです。

  1. Cloud Hadoopクラスタの Hive Metastore保存場所を Data Catalogに設定します。
    • 保存場所を Data Catalogに設定する方法に関する詳細は、Data Catalogに設定をご参照ください。
  2. Cloud Hadoopが提供する Ambari Web UIにアクセスします。
  3. Ambari Web UIで Services > Add Service > Spark3 を選択してインストールします。
    • Ambari Web Consoleへのアクセスに関する詳細は、Ambari UIをご参照ください。
  4. Cloud Hadoopクラスタエッジノードに SSH方式でアクセスします。
  5. spark3を実行するために python3をデフォルトで設定します。
    # python3を選択
    [sshuser@{hadoop cluster edge node} ~]$ sudo alternatives --config python
    # python3のバージョンを確認
    [sshuser@{hadoop cluster edge node} ~]$ python --version
    
  6. pyspark3をインストールします。
    [sshuser@{hadoop cluster edge node} ~]$ sudo sudo pip3 install pyspark
    
  7. pysparkコマンドを実行して、sparkのバージョンが3.xであるか確認します。
    [sshuser@{hadoop cluster edge node} ~]$ pyspark
    
  8. icebergの実行に必要な libraryをダウンロードします。
    mkdir /home1/sshuser/jars
    cd /home1/sshuser/jars
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar
    wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.3.1/iceberg-spark-runtime-3.2_2.12-1.3.1.jar
    wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.3.1/iceberg-hive-runtime-1.3.1.jar    
    

データ構造確認

Data Catalog Icebergテーブルの連携時に必要なソースデータ、フォルダ構造、ファイル構造、サンプルについて説明します。

ソースデータ

Object Storageに保存されたソースデータは、夏季/冬季の貨物便スケジュールに関する情報を含むデータであり、フライト日、フライト番号、到着予定時刻、運航開始/終了期間、運航年などの情報を有しています。

参考

ソースデータは、公共データポータルで提供されるデータを基に作成しました。

フォルダ構造

/cargo_flight_schedule
 |--- cargo_flight_schedule_summer_arrival.csv
 |--- cargo_flight_schedule_winter_arrival.csv

ファイル構造とサンプル

flight date flight no. planned time of arrival period (start) period (end) airline origin year
MONDAY PO237 0:30 2014.3.30 2014.10.25 Polar Air Cargo CINCINNATI CIN N. KNTY 2014
MONDAY OZ394 1:45 2014.3.31 2014.10.25 ASIANA AIRLINES BANGKOK Suvarnabhumi international airport 2014
MONDAY CK257 4:40 2014.3.30 2014.10.25 CHINA CARGO AIRLINES PUDONG 2014

1. Data Catalogで iceberg tableを作成

sparkで Loadするデータのスキーマに合わせて、Data Catalogで iceberg tableを作成します。

参考

テーブルタイプは Apache Icebergで作成します。

2. sparkでソースデータを iceberg tableに追加

sparkでソースデータを iceberg tableに追加する方法は、次の通りです。

  1. pythonコードを作成します。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    spark.sql("show databases").show()
    
    # S3 CSVのファイルパス
    csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_summer_arrival.csv"
    
    # Icebergのテーブルパス
    iceberg_table = "default.dct_iceberg"
    
    # CSVファイルの読み取り
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_file_path)
    
    # DataFrameを Icebergテーブルに Insert
    df.write \
        .format("iceberg") \
        .mode("append") \
        .save(iceberg_table)
    
    #SparkSession終了
    spark.stop()
    
    
    参考

    hive.metastore.urisは、Ambari Web Consoleアクセス用 > Services > Hive > Configs > ADVANCED > GENERAL > hive.metastore.urisの値である thrift://{data catalog metastore uri}:{data catalog metastore port}を入力してください。

  2. pythonコードを実行します。

    python dct_iceberg.py
    
    • Data Catalogコンソールで icebergテーブルのスキーマバージョンが追加されたことが確認できます。
    • Data Catalogコンソールで icebergテーブルの current-snapshot-idを確認できます。

3. sparkで他のソースデータを追加

sparkで他のソースデータを追加する方法は、次の通りです。

  1. pythonコードを作成します。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    spark.sql("show databases").show()
    
    csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_winter_arrival.csv"
    
    iceberg_table = "default.dct_iceberg"
    
    # CSVファイルの読み取り
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_file_path)
    
    # DataFrameを Icebergテーブルに Insert
    df.write \
        .format("iceberg") \
        .mode("append") \
        .save(iceberg_table)
    
    #SparkSession終了
    spark.stop()
    
    
  2. pythonコードを実行します。

    python dct_iceberg.py
    
    • Data Catalogコンソールで icebergテーブルのスキーマバージョンが新しく追加されたことが確認できます。
    • Data Catalogコンソールで icebergテーブルの current-snapshot-idが変更されたことが確認できます。

4. iceberg snapshotによる照会結果の比較

iceberg snapshotによる照会結果を比較する方法は、次の通りです。

  1. pythonコードを以下のように作成します。
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    iceberg_table = "default.dct_iceberg"
    
    # cargo_flight_schedule_summer_arrival.csvのみ追加した状態のテーブルを照会
    spark.read.format("iceberg").option("snapshot-id", "{2番での snapshot-id}").load(iceberg_table).show()
    
    # cargo_flight_schedule_winter_arrival.csvも追加した状態のテーブルを照会
    spark.read.format("iceberg").option("snapshot-id", "{3番での snapshot-id}").load(iceberg_table).show()
    
    #SparkSession終了
    spark.stop()
    
    
  2. pythonコードを実行します。
    python dct_iceberg.py
    

5.sparkで icebergファイルの Compactionを実行

sparkで icebergファイルの Compactionを実行する方法は、次の通りです。

  1. ObjectStorageで snapshot fileが2つあることを確認します。

    /{bucketName}/{table location}/metadata/
     |--- snap-{2番プロセス-snapshot-id}-{UUID}.avro
     |--- snap-{3番プロセス-snapshot-id}-{UUID}.avro
    
  2. Data Catalogコンソールの icebergテーブルのプロパティ情報で snapshot-countが2つあることを確認します。

  3. icebergメタデータファイルの Compactionを実行するための pythonコードを作成します。

    • icebergの expire_snapshots プロシージャを呼び出して古い snapshotsファイルを整理します。
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    # call expire_snapshots procedure
    ## iceberg_table = "default.dct_iceberg"
    spark.sql("""
        CALL system.expire_snapshots('default.dct_iceberg', TIMESTAMP '{基準時間}', {残す snapshot数})
    """)
    
    #SparkSession終了
    spark.stop()
    
    
  4. pythonコードを実行します。

    python dct_iceberg.py
    
    • Data Catalogコンソールで icebergテーブルのプロパティ情報の snapshot-count{남길 snapshot 개수}になったことが確認できます。
    • ObjectStorageで snapshot fileが整理されたことが確認できます。
      # {残す snapshot数} = 1の場合
      /{bucketName}/{table location}/metadata/
       |--- snap-{3番プロセス-snapshot-id}-{UUID}.avro