VPC 환경에서 이용 가능합니다.
Cloud Hadoop 서비스의 Spark로 Iceberg 테이블을 관리하는 방법을 소개합니다.
Spark with Cloud Hadoop을 사용하여 Data Catalog의 iceberg 테이블과 연동하는 시나리오는 다음과 같습니다.
- Data Catalog에서 iceberg table을 생성합니다.
- spark로 소스 데이터(cargo_flight_schedule_summer_arrival.csv)를 iceberg table에 추가합니다.
- spark로 다른 소스 데이터(cargo_flight_schedule_winter_arrival.csv)를 동일한 iceberg table에 추가합니다.
- iceberg snapshot에 따른 조회 결과 비교합니다.
- spark로 iceberg 파일의 Compaction을 수행합니다.
이 가이드에서는 Apache Spark를 실행할 수 있는 다양한 방식 중 Python 코드를 PySpark 환경에서 실행하는 방법을 기준으로 안내합니다.
사전 작업
Spark with Cloud Hadoop을 사용하여 Data Catalog의 iceberg 테이블과 연동하기 위해 준비해야 하는 작업을 설명합니다.
Cloud Hadoop 설정
Cloud Hadoop을 설정하고,Cloud Hadoop spark3 설치하는 방법은 다음과 같습니다.
- Cloud Hadoop 클러스터의 Hive Metastore 저장소를 Data Catalog로 설정해 주십시오.
- 저장소를 Data Catalog로 설정하는 방법에 관련한 자세한 내용은 Data Catalog로 설정을 참조해 주십시오.
- Cloud Hadoop에서 제공하는 Ambari Web UI에 접속해 주십시오.
- Ambari Web UI에서 Services > Add Service > Spark3를 선택한 후 설치해 주십시오.
- Ambari Web Console 접속과 관련한 자세한 내용은 Ambari UI를 참조해 주십시오.
- Cloud Hadoop 클러스터의 엣지 노드에 SSH 방식으로 접속해 주십시오.
- SSH 방식으로 엣지 노드에 접속하는 방법과 관련한 자세한 내용은 SSH로 클러스터 노드 접속을 참조해 주십시오.
- spark3 실행을 위해 python3를 기본으로 설정해 주십시오.
# python3 선택 [sshuser@{hadoop cluster edge node} ~]$ sudo alternatives --config python # python3 버전확인 [sshuser@{hadoop cluster edge node} ~]$ python --version - pyspark3를 설치해 주십시오.
[sshuser@{hadoop cluster edge node} ~]$ sudo sudo pip3 install pyspark - pyspark 명령어를 실행해 spark 버전이 3.x 인 것을 확인해 주십시오
[sshuser@{hadoop cluster edge node} ~]$ pyspark - iceberg 실행에 필요한 library를 다운로드해 주십시오.
mkdir /home1/sshuser/jars cd /home1/sshuser/jars wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.2_2.12/1.3.1/iceberg-spark-runtime-3.2_2.12-1.3.1.jar wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.3.1/iceberg-hive-runtime-1.3.1.jar
데이터 구조 확인
Data Catalog Iceberg 테이블 연동 시 필요한 소스 데이터, 폴더 구조, 파일 구조 및 샘플에 대해 설명합니다.
소스 데이터
Object Storage에 저장된 소스 데이터는 여름철/겨울철 화물 항공편 스케줄에 대한 정보를 담고 있는 데이터로 항공편 날짜, 항공편 번호, 예상 도착 시간, 운항 시작/종료 기간, 운항 연도 등의 정보를 갖고 있습니다.
소스 데이터는 공공 데이터 포털에서 제공하는 데이터를 기반으로 하였습니다.
폴더 구조
/cargo_flight_schedule
|--- cargo_flight_schedule_summer_arrival.csv
|--- cargo_flight_schedule_winter_arrival.csv
파일 구조 및 샘플
| flight date | flight no. | planned time of arrival | period (start) | period (end) | airline | origin | year |
|---|---|---|---|---|---|---|---|
| MONDAY | PO237 | 0:30 | 2014.3.30 | 2014.10.25 | Polar Air Cargo | CINCINNATI CIN N. KNTY | 2014 |
| MONDAY | OZ394 | 1:45 | 2014.3.31 | 2014.10.25 | ASIANA AIRLINES | BANGKOK Suvarnabhumi international airport | 2014 |
| MONDAY | CK257 | 4:40 | 2014.3.30 | 2014.10.25 | CHINA CARGO AIRLINES | PUDONG | 2014 |
1. Data Catalog에서 iceberg table 생성
spark로 Load하려는 데이터의 스키마에 맞게 Data Catalog에서 iceberg table 생성해 주십시오.
테이블 유형은 Apache Iceberg로 생성합니다.
2. spark로 소스 데이터를 iceberg table에 추가
spark로 소스 데이터를 iceberg table에 추가하는 방법은 다음과 같습니다.
-
python 코드를 작성해 주십시오.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() # S3 CSV 파일 경로 csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_summer_arrival.csv" # Iceberg 테이블 경로 iceberg_table = "default.dct_iceberg" # CSV 파일 읽기 df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # DataFrame을 Iceberg 테이블에 Insert df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #SparkSession 종료 spark.stop()참고hive.metastore.uris는 Ambari Web Console 접속용 > Services > Hive > Configs > ADVANCED > GENERAL > hive.metastore.uris의 값인
thrift://{data catalog metastore uri}:{data catalog metastore port}을 입력해 주십시오. -
python 코드를 실행해 주십시오.
python dct_iceberg.py- Data Catalog 콘솔에서 iceberg 테이블의 스키마 버전이 추가된 것을 확인할 수 있습니다.
- Data Catalog 콘솔에서 iceberg 테이블의 current-snapshot-id를 확인할 수 있습니다.
3. spark로 다른 소스 데이터 추가
spark로 다른 소스 데이터를 추가하는 방법은 다음과 같습니다.
-
python 코드를 작성해 주십시오.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); spark.sql("show databases").show() csv_file_path = "s3a://{bucketName}/cargo_flight_schedule/cargo_flight_schedule_winter_arrival.csv" iceberg_table = "default.dct_iceberg" # CSV 파일 읽기 df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(csv_file_path) # DataFrame을 Iceberg 테이블에 Insert df.write \ .format("iceberg") \ .mode("append") \ .save(iceberg_table) #SparkSession 종료 spark.stop() -
python 코드를 실행해 주십시오.
python dct_iceberg.py- Data Catalog 콘솔에서 iceberg 테이블의 스키마 버전이 새로 추가된 것을 확인할 수 있습니다.
- Data Catalog 콘솔에서 iceberg 테이블의 current-snapshot-id가 바뀐 것을 확인할 수 있습니다.
4. iceberg snapshot에 따른 조회 결과 비교
iceberg snapshot에 따른 조회 결과를 비교하는 방법은 다음과 같습니다.
- python 코드를 다음과 같이 작성해 주십시오.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); iceberg_table = "default.dct_iceberg" # cargo_flight_schedule_summer_arrival.csv만 추가한 상태의 테이블 조회 spark.read.format("iceberg").option("snapshot-id", "{2번에서의 snapshot-id}").load(iceberg_table).show() # cargo_flight_schedule_winter_arrival.csv도 추가한 상태의 테이블 조회 spark.read.format("iceberg").option("snapshot-id", "{3번에서의 snapshot-id}").load(iceberg_table).show() #SparkSession 종료 spark.stop() - python 코드를 실행해 주십시오.
python dct_iceberg.py
5. spark로 iceberg 파일의 Compaction 수행
spark로 iceberg 파일의 Compaction를 수행하는 방법은 다음과 같습니다.
- ObjectStorage에서 snapshot file이 2개인 것을 확인해 주십시오.
/{bucketName}/{table location}/metadata/ |--- snap-{2번 과정-snapshot-id}-{UUID}.avro |--- snap-{3번 과정-snapshot-id}-{UUID}.avro - Data Catalog 콘솔의 iceberg 테이블의 속성 정보에서
snapshot-count가 2개인 것을 확인해 주십시오. - iceberg 메타데이터 파일의 Compaction 수행을 위한 python 코드를 작성해 주십시오.
- iceberg의
expire_snapshots프로시저를 호출하여 오래된 snapshots 파일을 정리합니다.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DCT_SPARK") \ .config("spark.jars", "/home1/sshuser/jars/hadoop-aws-3.3.1.jar,/home1/sshuser/jars/aws-java-sdk-bundle-1.11.901.jar,/home1/sshuser/jars/iceberg-spark-runtime-3.2_2.12-1.3.1.jar,/home1/sshuser/jars/iceberg-hive-runtime-1.3.1.jar") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \ .config("hive.metastore.uris", "thrift://{data catalog metastore uri}:{data catalog metastore port}") \ .config("spark.hadoop.fs.s3a.access.key", "{ncp access key}") \ .config("spark.hadoop.fs.s3a.secret.key", "{ncp secret key}") \ .config("spark.hadoop.fs.s3a.endpoint", "https://kr.object.ncloudstorage.com") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .config("spark.sql.hive.metastore.jars", "maven") \ .config("spark.sql.hive.metastore.version", "3.1.3") \ .enableHiveSupport() \ .getOrCreate(); # call expire_snapshots procedure ## iceberg_table = "default.dct_iceberg" spark.sql(""" CALL system.expire_snapshots('default.dct_iceberg', TIMESTAMP '{기준 시간}', {남길 snapshot 개수}) """) #SparkSession 종료 spark.stop() - iceberg의
- python 코드를 실행해 주십시오.
python dct_iceberg.py- Data Catalog 콘솔에서 iceberg 테이블 속성 정보의
snapshot-count가{남길 snapshot 개수}가 된 것을 확인할 수 있습니다. - ObjectStorage에서 snapshot file이 정리된 것을 확인할 수 있습니다.
# {남길 snapshot 개수} = 1인 경우 /{bucketName}/{table location}/metadata/ |--- snap-{3번 과정-snapshot-id}-{UUID}.avro
- Data Catalog 콘솔에서 iceberg 테이블 속성 정보의