Available in VPC
Describes how to manage Iceberg tables using Cloud Hadoop's Spark.
The following describes a scenario for integrating with Iceberg tables of Data Catalog using Spark with Cloud Hadoop:
- Create an Iceberg table in Data Catalog.
- Add the source data (cargo_flight_schedule_summer_arrival.csv) to the Iceberg table with Spark.
- Add another source data (cargo_flight_schedule_winter_arrival.csv) to the same Iceberg table with Spark.
- Compare the search result according to Iceberg snapshot.
- Perform Iceberg file compaction with Spark.
This guide focuses on how to run the Python code in the PySpark environment among various ways to run Apache Spark.
Preparations
This section describes the preparations for integrating with Iceberg tables of Data Catalog using Spark with Cloud Hadoop.
Set Cloud Hadoop
To set up Cloud Hadoop and install Cloud Hadoop Spark3, follow these steps:
- Set the Cloud Hadoop cluster's Hive Metastore repository to Data Catalog.
- For more information on how to set the storage to Data Catalog, see Set to Data Catalog.
- Access Ambari Web UI provided by Cloud Hadoop.
- Select Services > Add Service > Spark3 from Ambari Web UI and install it.
- For more information on Ambari Web Console access, see Ambari UI.
- Access the edge node of the Cloud Hadoop cluster through SSH.
- For more information on how to access the edge node through SSH, see Access a cluster node with SSH.
- Set python3 to default for running Spark3.
# Select python3 [sshuser@{hadoop cluster edge node} ~]$ sudo alternatives --config python # Check the python3 version [sshuser@{hadoop cluster edge node} ~]$ python --version - Install pyspark3.
[sshuser@{hadoop cluster edge node} ~]$ sudo sudo pip3 install pyspark - Run the pyspark command to check whether the Spark version is 3.x.
[sshuser@{hadoop cluster edge node} ~]$ pyspark - Download the library necessary for running Iceberg.
mkdir /home1/sshuser/jars cd /home1/sshuser/jars wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.3.1/iceberg-spark-runtime-3.2_2.12-1.3.1.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.3.1/iceberg-hive-runtime-1.3.1.jar
Check data structure
Describes the source data, folder structure, file structure, and samples necessary for integrating with the Data Catalog Iceberg table.
Source data
The source data stored in Object Storage is the data with air cargo flight schedules in summer/winter, which contains information, such as flight date, flight number, planned time of arrival, operation start/end period, and flight operation year.
The source data is based on the data provided by public data portals.
Folder structure
/cargo_flight_schedule
|--- cargo_flight_schedule_summer_arrival.csv
|--- cargo_flight_schedule_winter_arrival.csv
File structure and samples
| flight date | flight no. | planned time of arrival | period (start) | period (end) | airline | origin | year |
|---|---|---|---|---|---|---|---|
| MONDAY | PO237 | 0:30 | March 30, 2014 | October 25, 2014 | Polar Air Cargo | CINCINNATI CIN N. KNTY | 2014 |
| MONDAY | OZ394 | 1:45 | March 31, 2014 | October 25, 2014 | ASIANA AIRLINES | BANGKOK Suvarnabhumi international airport | 2014 |
| MONDAY | CK257 | 4:40 | March 30, 2014 | October 25, 2014 | CHINA CARGO AIRLINES | PUDONG | 2014 |
1. Create Iceberg table in Data Catalog
Create the Iceberg table in Data Catalog according to the schema of the data you want to load into Spark.
Create the table type as Apache Iceberg.
2. Add the source data to Iceberg table with Spark
To add the source data to the Iceberg table with Spark, follow these steps:
-
Write the Python code.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() # S3 CSV file path csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_summer_arrival.csv" # Iceberg table path iceberg_table = "default.dct_iceberg" # Read CSV file df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # Insert DataFrame to Iceberg table df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #End SparkSession spark.stop()NoteFor hive.metastore.uris, enter
thrift://{data catalog metastore uri}:{data catalog metastore port}, the value fromfor Ambari Web Console access > Services > Hive > Configs > ADVANCED > GENERAL > hive.metastore.uris. -
Run the Python code.
python dct_iceberg.py- You can view that the schema version of the Iceberg table is added in the Data Catalog console.
- You can view the current-snapshot-id of the Iceberg table in the Data Catalog console.
3. Add another source data with Spark
To add another source data with Spark, follow these steps:
-
Write the Python code.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_winter_arrival.csv" iceberg_table = "default.dct_iceberg" # Read CSV file df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # Insert DataFrame to Iceberg table df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #End SparkSession spark.stop() -
Run the Python code.
python dct_iceberg.py- You can view that the schema version of the Iceberg table has been newly added in the Data Catalog console.
- You can view that current-snapshot-id of the Iceberg table has changed from the Data Catalog console.
4. Compare the search result according to Iceberg snapshot
To compare the search results according to the Iceberg snapshot, follow these steps:
- Write the Python code as follows.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); iceberg_table = "default.dct_iceberg" # View the table with only cargo_flight_schedule_summer_arrival.csv added spark.read.format("iceberg").option("snapshot-id", "{snapshot-id in step 2}").load(iceberg_table).show() # View the table with cargo_flight_schedule_winter_arrival.csv also added spark.read.format("iceberg").option("snapshot-id", "{snapshot-id in step 3}").load(iceberg_table).show() #End SparkSession spark.stop() - Run the Python code.
python dct_iceberg.py
5. Perform compaction on Iceberg files with Spark
To perform compaction on Iceberg files with Spark, follow these steps:
-
Check whether there are 2 snapshot files in ObjectStorage.
/{bucketName}/{table location}/metadata/ |--- snap-{step 2-snapshot-id}-{UUID}.avro |--- snap-{step 3-snapshot-id}-{UUID}.avro -
Check whether
snapshot-countis 2 in the Iceberg table's properties information on the Data Catalog console. -
Write the Python code for performing compaction on Iceberg metadata files.
- Call the
expire_snapshotsprocedure of the Iceberg to clean up old snapshot files.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); # call expire_snapshots procedure ## iceberg_table = "default.dct_iceberg" spark.sql(""" CALL system.expire_snapshots('default.dct_iceberg', TIMESTAMP '{base time}', {the number of snapshots to keep}) """) #End SparkSession spark.stop() - Call the
-
Run the Python code.
python dct_iceberg.py- You can view that
snapshot-countin the Iceberg table properties information has become{남길 snapshot 개수}in the Data Catalog console. - You can view that the snapshot files have been cleaned up in ObjectStorage.
# if {the number of snapshots to keep} = 1 /{bucketName}/{table location}/metadata/ |--- snap-{step 3-snapshot-id}-{UUID}.avro
- You can view that