VPC環境で利用できます。
Cloud Hadoopサービスの Sparkで Icebergテーブルを管理する方法を紹介します。
Spark with Cloud Hadoopを使用して Data Catalogの icebergテーブルと連携するシナリオは、次の通りです。
- Data Catalogで iceberg tableを作成します。
- sparkでソースデータ(cargo flight schedule summer arrival.csv)を iceberg tableに追加します。
- sparkで他のソースデータ(cargo flight schedule winter arrival.csv)を同じ iceberg tableに追加します。
- iceberg snapshotによる照会結果を比較します。
- sparkで icebergファイルの Compactionを実行します。
このガイドでは Apache Sparkを実行する様々な方法のうち、Pythonコードを PySpark環境で実行する方法を基準にご案内します。
事前タスク
Spark with Cloud Hadoopを使用して Data Catalogの icebergテーブルと連携するために準備すべきタスクを説明します。
Cloud Hadoop設定
Cloud Hadoopを設定し、Cloud Hadoop spark3をインストールする方法は、次の通りです。
- Cloud Hadoopクラスタの Hive Metastore保存場所を Data Catalogに設定します。
- 保存場所を Data Catalogに設定する方法に関する詳細は、Data Catalogに設定をご参照ください。
- Cloud Hadoopが提供する Ambari Web UIにアクセスします。
- Ambari Web UIで Services > Add Service > Spark3 を選択してインストールします。
- Ambari Web Consoleへのアクセスに関する詳細は、Ambari UIをご参照ください。
- Cloud Hadoopクラスタエッジノードに SSH方式でアクセスします。
- SSH方式でエッジノードにアクセスする方法に関する詳細は、SSHによるクラスタノードアクセスをご参照ください。
- spark3を実行するために python3をデフォルトで設定します。
# python3を選択 [sshuser@{hadoop cluster edge node} ~]$ sudo alternatives --config python # python3のバージョンを確認 [sshuser@{hadoop cluster edge node} ~]$ python --version - pyspark3をインストールします。
[sshuser@{hadoop cluster edge node} ~]$ sudo sudo pip3 install pyspark - pysparkコマンドを実行して、sparkのバージョンが3.xであるか確認します。
[sshuser@{hadoop cluster edge node} ~]$ pyspark - icebergの実行に必要な libraryをダウンロードします。
mkdir /home1/sshuser/jars cd /home1/sshuser/jars wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.3.1/iceberg-spark-runtime-3.2_2.12-1.3.1.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.3.1/iceberg-hive-runtime-1.3.1.jar
データ構造確認
Data Catalog Icebergテーブルの連携時に必要なソースデータ、フォルダ構造、ファイル構造、サンプルについて説明します。
ソースデータ
Object Storageに保存されたソースデータは、夏季/冬季の貨物便スケジュールに関する情報を含むデータであり、フライト日、フライト番号、到着予定時刻、運航開始/終了期間、運航年などの情報を有しています。
ソースデータは、公共データポータルで提供されるデータを基に作成しました。
フォルダ構造
/cargo_flight_schedule
|--- cargo_flight_schedule_summer_arrival.csv
|--- cargo_flight_schedule_winter_arrival.csv
ファイル構造とサンプル
| flight date | flight no. | planned time of arrival | period (start) | period (end) | airline | origin | year |
|---|---|---|---|---|---|---|---|
| MONDAY | PO237 | 0:30 | 2014.3.30 | 2014.10.25 | Polar Air Cargo | CINCINNATI CIN N. KNTY | 2014 |
| MONDAY | OZ394 | 1:45 | 2014.3.31 | 2014.10.25 | ASIANA AIRLINES | BANGKOK Suvarnabhumi international airport | 2014 |
| MONDAY | CK257 | 4:40 | 2014.3.30 | 2014.10.25 | CHINA CARGO AIRLINES | PUDONG | 2014 |
1. Data Catalogで iceberg tableを作成
sparkで Loadするデータのスキーマに合わせて、Data Catalogで iceberg tableを作成します。
テーブルタイプは Apache Icebergで作成します。
2. sparkでソースデータを iceberg tableに追加
sparkでソースデータを iceberg tableに追加する方法は、次の通りです。
-
pythonコードを作成します。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() # S3 CSVのファイルパス csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_summer_arrival.csv" # Icebergのテーブルパス iceberg_table = "default.dct_iceberg" # CSVファイルの読み取り df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # DataFrameを Icebergテーブルに Insert df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #SparkSession終了 spark.stop()参考hive.metastore.urisは、Ambari Web Consoleアクセス用 > Services > Hive > Configs > ADVANCED > GENERAL > hive.metastore.urisの値である
thrift://{data catalog metastore uri}:{data catalog metastore port}を入力してください。 -
pythonコードを実行します。
python dct_iceberg.py- Data Catalogコンソールで icebergテーブルのスキーマバージョンが追加されたことが確認できます。
- Data Catalogコンソールで icebergテーブルの current-snapshot-idを確認できます。
3. sparkで他のソースデータを追加
sparkで他のソースデータを追加する方法は、次の通りです。
-
pythonコードを作成します。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_winter_arrival.csv" iceberg_table = "default.dct_iceberg" # CSVファイルの読み取り df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # DataFrameを Icebergテーブルに Insert df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #SparkSession終了 spark.stop() -
pythonコードを実行します。
python dct_iceberg.py- Data Catalogコンソールで icebergテーブルのスキーマバージョンが新しく追加されたことが確認できます。
- Data Catalogコンソールで icebergテーブルの current-snapshot-idが変更されたことが確認できます。
4. iceberg snapshotによる照会結果の比較
iceberg snapshotによる照会結果を比較する方法は、次の通りです。
- pythonコードを以下のように作成します。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); iceberg_table = "default.dct_iceberg" # cargo_flight_schedule_summer_arrival.csvのみ追加した状態のテーブルを照会 spark.read.format("iceberg").option("snapshot-id", "{2番での snapshot-id}").load(iceberg_table).show() # cargo_flight_schedule_winter_arrival.csvも追加した状態のテーブルを照会 spark.read.format("iceberg").option("snapshot-id", "{3番での snapshot-id}").load(iceberg_table).show() #SparkSession終了 spark.stop() - pythonコードを実行します。
python dct_iceberg.py
5.sparkで icebergファイルの Compactionを実行
sparkで icebergファイルの Compactionを実行する方法は、次の通りです。
-
ObjectStorageで snapshot fileが2つあることを確認します。
/{bucketName}/{table location}/metadata/ |--- snap-{2番プロセス-snapshot-id}-{UUID}.avro |--- snap-{3番プロセス-snapshot-id}-{UUID}.avro -
Data Catalogコンソールの icebergテーブルのプロパティ情報で
snapshot-countが2つあることを確認します。 -
icebergメタデータファイルの Compactionを実行するための pythonコードを作成します。
- icebergの
expire_snapshotsプロシージャを呼び出して古い snapshotsファイルを整理します。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); # call expire_snapshots procedure ## iceberg_table = "default.dct_iceberg" spark.sql(""" CALL system.expire_snapshots('default.dct_iceberg', TIMESTAMP '{基準時間}', {残す snapshot数}) """) #SparkSession終了 spark.stop() - icebergの
-
pythonコードを実行します。
python dct_iceberg.py- Data Catalogコンソールで icebergテーブルのプロパティ情報の
snapshot-countが{남길 snapshot 개수}になったことが確認できます。 - ObjectStorageで snapshot fileが整理されたことが確認できます。
# {残す snapshot数} = 1の場合 /{bucketName}/{table location}/metadata/ |--- snap-{3番プロセス-snapshot-id}-{UUID}.avro
- Data Catalogコンソールで icebergテーブルのプロパティ情報の