Iceberg table management with Cloud Hadoop Spark

Prev Next

Available in VPC

Describes how to manage Iceberg tables using Cloud Hadoop's Spark.

The following describes a scenario for integrating with Iceberg tables of Data Catalog using Spark with Cloud Hadoop:

  1. Create an Iceberg table in Data Catalog.
  2. Add the source data (cargo_flight_schedule_summer_arrival.csv) to the Iceberg table with Spark.
  3. Add another source data (cargo_flight_schedule_winter_arrival.csv) to the same Iceberg table with Spark.
  4. Compare the search result according to Iceberg snapshot.
  5. Perform Iceberg file compaction with Spark.
Note

This guide focuses on how to run the Python code in the PySpark environment among various ways to run Apache Spark.

Preparations

This section describes the preparations for integrating with Iceberg tables of Data Catalog using Spark with Cloud Hadoop.

Set Cloud Hadoop

To set up Cloud Hadoop and install Cloud Hadoop Spark3, follow these steps:

  1. Set the Cloud Hadoop cluster's Hive Metastore repository to Data Catalog.
  2. Access Ambari Web UI provided by Cloud Hadoop.
  3. Select Services > Add Service > Spark3 from Ambari Web UI and install it.
    • For more information on Ambari Web Console access, see Ambari UI.
  4. Access the edge node of the Cloud Hadoop cluster through SSH.
  5. Set python3 to default for running Spark3.
    # Select python3
    [sshuser@{hadoop cluster edge node} ~]$ sudo alternatives --config python
    # Check the python3 version
    [sshuser@{hadoop cluster edge node} ~]$ python --version
    
  6. Install pyspark3.
    [sshuser@{hadoop cluster edge node} ~]$ sudo sudo pip3 install pyspark
    
  7. Run the pyspark command to check whether the Spark version is 3.x.
    [sshuser@{hadoop cluster edge node} ~]$ pyspark
    
  8. Download the library necessary for running Iceberg.
    mkdir /home1/sshuser/jars
    cd /home1/sshuser/jars
    wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
    wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar
    wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.3.1/iceberg-spark-runtime-3.2_2.12-1.3.1.jar
    wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.3.1/iceberg-hive-runtime-1.3.1.jar    
    

Check data structure

Describes the source data, folder structure, file structure, and samples necessary for integrating with the Data Catalog Iceberg table.

Source data

The source data stored in Object Storage is the data with air cargo flight schedules in summer/winter, which contains information, such as flight date, flight number, planned time of arrival, operation start/end period, and flight operation year.

Note

The source data is based on the data provided by public data portals.

Folder structure

/cargo_flight_schedule
 |--- cargo_flight_schedule_summer_arrival.csv
 |--- cargo_flight_schedule_winter_arrival.csv

File structure and samples

flight date flight no. planned time of arrival period (start) period (end) airline origin year
MONDAY PO237 0:30 March 30, 2014 October 25, 2014 Polar Air Cargo CINCINNATI CIN N. KNTY 2014
MONDAY OZ394 1:45 March 31, 2014 October 25, 2014 ASIANA AIRLINES BANGKOK Suvarnabhumi international airport 2014
MONDAY CK257 4:40 March 30, 2014 October 25, 2014 CHINA CARGO AIRLINES PUDONG 2014

1. Create Iceberg table in Data Catalog

Create the Iceberg table in Data Catalog according to the schema of the data you want to load into Spark.

Note

Create the table type as Apache Iceberg.

2. Add the source data to Iceberg table with Spark

To add the source data to the Iceberg table with Spark, follow these steps:

  1. Write the Python code.

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    spark.sql("show databases").show()
    
    # S3 CSV file path
    csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_summer_arrival.csv"
    
    # Iceberg table path
    iceberg_table = "default.dct_iceberg"
    
    # Read CSV file
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_file_path)
    
    # Insert DataFrame to Iceberg table
    df.write \
        .format("iceberg") \
        .mode("append") \
        .save(iceberg_table)
    
    #End SparkSession
    spark.stop()
    
    
    Note

    For hive.metastore.uris, enter thrift://{data catalog metastore uri}:{data catalog metastore port}, the value fromfor Ambari Web Console access > Services > Hive > Configs > ADVANCED > GENERAL > hive.metastore.uris.

  2. Run the Python code.

    python dct_iceberg.py
    
    • You can view that the schema version of the Iceberg table is added in the Data Catalog console.
    • You can view the current-snapshot-id of the Iceberg table in the Data Catalog console.

3. Add another source data with Spark

To add another source data with Spark, follow these steps:

  1. Write the Python code.

    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    spark.sql("show databases").show()
    
    csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_winter_arrival.csv"
    
    iceberg_table = "default.dct_iceberg"
    
    # Read CSV file
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(csv_file_path)
    
    # Insert DataFrame to Iceberg table
    df.write \
        .format("iceberg") \
        .mode("append") \
        .save(iceberg_table)
    
    #End SparkSession
    spark.stop()
    
    
  2. Run the Python code.

    python dct_iceberg.py
    
    • You can view that the schema version of the Iceberg table has been newly added in the Data Catalog console.
    • You can view that current-snapshot-id of the Iceberg table has changed from the Data Catalog console.

4. Compare the search result according to Iceberg snapshot

To compare the search results according to the Iceberg snapshot, follow these steps:

  1. Write the Python code as follows.
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    iceberg_table = "default.dct_iceberg"
    
    # View the table with only cargo_flight_schedule_summer_arrival.csv added
    spark.read.format("iceberg").option("snapshot-id", "{snapshot-id in step 2}").load(iceberg_table).show()
    
    # View the table with cargo_flight_schedule_winter_arrival.csv also added
    spark.read.format("iceberg").option("snapshot-id", "{snapshot-id in step 3}").load(iceberg_table).show()
    
    #End SparkSession
    spark.stop()
    
    
  2. Run the Python code.
    python dct_iceberg.py
    

5. Perform compaction on Iceberg files with Spark

To perform compaction on Iceberg files with Spark, follow these steps:

  1. Check whether there are 2 snapshot files in ObjectStorage.

    /{bucketName}/{table location}/metadata/
     |--- snap-{step 2-snapshot-id}-{UUID}.avro
     |--- snap-{step 3-snapshot-id}-{UUID}.avro
    
  2. Check whether snapshot-count is 2 in the Iceberg table's properties information on the Data Catalog console.

  3. Write the Python code for performing compaction on Iceberg metadata files.

    • Call the expire_snapshots procedure of the Iceberg to clean up old snapshot files.
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("DCT_SPARK") \
        .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \
        .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \
        .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.hive.metastore.jars", "maven") \
        .config("spark.sql.hive.metastore.version", "3.1.3") \
        .enableHiveSupport() \
        .getOrCreate();
    
    # call expire_snapshots procedure
    ## iceberg_table = "default.dct_iceberg"
    spark.sql("""
        CALL system.expire_snapshots('default.dct_iceberg', TIMESTAMP '{base time}', {the number of snapshots to keep})
    """)
    
    #End SparkSession
    spark.stop()
    
    
  4. Run the Python code.

    python dct_iceberg.py
    
    • You can view that snapshot-count in the Iceberg table properties information has become {남길 snapshot 개수} in the Data Catalog console.
    • You can view that the snapshot files have been cleaned up in ObjectStorage.
      # if {the number of snapshots to keep} = 1
      /{bucketName}/{table location}/metadata/
       |--- snap-{step 3-snapshot-id}-{UUID}.avro