VPC環境で利用できます。
Jobメニューの画面構成とタスクエディタの画面構成、タスク作成手順とタスク実行オプションの設定手順を説明します。
タスクとは大規模データを抽出、変換、積載するデータ処理タスクです。
Data Flowがサポートするデータ変換にはプロパティの定義、プロパティの選択、列の結合、フィルタ、行の結合、集計、プロパティ名の変更、重複の削除、欠損値の穴埋めがあります。
ソースノードとターゲットノードとしては NAVERクラウドプラットフォームの Object Storageと Data Catalogを指定できます。今後 NAVERクラウドプラットフォームの Cloud DBと顧客企業の On-premiseデータベースの連携をサポートする計画です。
タスクエディタはコードを作成せずに ETLタスクを構成できる GUIインタフェースです。ソースノード、変換ノード、ターゲットノードをダイアグラムで構成します。
Job画面
Job画面は以下のように構成されています。

| 領域 | 説明 |
|---|---|
| ① メニュー名 | 現在確認中のメニュー名 |
| ② 基本機能 | Jobメニューに初回アクセスすると表示される機能
|
| ③ 作成後の機能 | タスク作成後に提供される機能
|
| ④ Jobリスト | 作成された Jobリスト。Job別に [詳細を見る] ボタンをクリックすると、タスクエディタ画面に移動。 |
| ⑤ 検索ウィンドウ | タスク名を使って作成されたタスクを検索 |
タスク情報の確認
作成したタスクの情報を確認する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- タスクリストが表示されたら、サマリー情報を確認します。
- タスク名: Jobの作成時にユーザーが入力した Job固有の名前
- 最近の実行日時: 最近の Job実行日時。トリガーによって予約実行されるか、オンデマンドが実行された最新日時。
- 最近の実行ステータス: 最近の Job実行ステータス。最後に実行されたタスクのステータス。
- READY: Jobの実行準備完了のステータス
- RUNNING: Jobが実行中のステータス
- COMPLETED: Jobの実行が完了したステータス
- FAILED: Jobの実行が失敗したステータス
- ステータス: Jobの実行ステータス
- RUNNABLE: Jobの実行が可能なステータス
- RUNNING: Jobが実行中のステータス
- DELETED: Jobを削除中または削除済みのステータス
- DRAFT: Job編集が完了していないステータス。エディタ画面で [一時保存] ボタンをクリックすると一時保存される。
- EDITING: Jobが変更中のステータス(有効性検証が必要)
- STOPPED: Jobが停止中のステータス
- アップデート日時: 最近の Jobアップデート日時。タスクエディタでタスク構成を変更した最新日。
- [詳細を見る] ボタン: Jobの詳細情報を照会
- タスク構成に関する詳細を照会するには [詳細を見る] ボタンをクリックします。
- タスクエディタの画面構成に移動し、そのタスクに対するノード構成と設定内容を確認できます。
タスク作成
Visual Modeや Script Modeでタスクを構成できます。
Visual Modeでは、ソースノード、変換ノード、ターゲットノードを追加・設定してタスクを構成できます。
Script Modeモードでは、直接 pysparkコードを作成して実行できます。
ソースノードとターゲットノードを指定するには Data Catalogと Object Storageを利用している必要があります。Data Catalogと Object Storageを利用しない場合は、当該サービスに対する申し込みを先に行います。
Visual Modeでのタスク作成
Visual Modeの新規タスクを作成する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- [タスク作成] ボタンをクリックします。
- Visual Modeを選択し、 [タスク作成] ボタンをクリックします。
- タスクエディタ画面が表示されたら、 [タスク構成] タブでソースノード、変換ノード、ターゲットノードを追加してタスク内容を設定します。
- エディタの画面構成については、タスクエディタの画面構成をご参照ください。
- タスクエディタ画面で [ソース] ボタンをクリックして表示されるメニューで Object Storageまたは Data Catalogを選択します。
- Object Storage: NAVERクラウドプラットフォームの Object Storageバケットをデータソースとして指定
- Data Catalog: NAVERクラウドプラットフォームの Data Catalogをデータソースとして指定
- JDBC: NAVERクラウドプラットフォームの Data Catalogの JDBCコネクションをデータソースとして指定
- Data Catalogに保存されたテーブルのうち、詳細タイプ別の可否は次の通りです
- Object Storageを位置とする Tableのうち、Parquet、Json、CSV、データ形式のテーブルをサポート
- JDBCタイプのコネクションでスキャンした MySQL、PostgreSQL、MongoDBデータ形式テーブルをサポート
- Cloud_db_forタイプのコネクションでスキャンしたテーブルを未サポート
- Icebergテーブルタイプは未サポート
- 6.で追加したソースノードを選択し、右画面でソースノードのプロパティ情報と詳細設定を入力します。
- タスクエディタ画面で [変換] ボタンをクリックして表示されるメニューで変換を選択します。
- プロパティの定義: ソースデータを使用してターゲットデータのスキーマを定義します。設定項目に関する詳細な説明は、プロパティの定義をご参照ください。
- プロパティの選択: ソースデータ集合のプロパティキーの中でターゲットデータの構成プロパティを選択します。設定項目に関する詳細な説明は、プロパティの選択をご参照ください。
- 列の結合: 2つのデータ集合を結合します。設定項目に関する詳細な説明は、列の結合をご参照ください。
- フィルタ: 入力データ集合をフィルタリングして新たなデータ集合を作成します。設定項目に関する詳細な説明は、フィルタをご参照ください。
- 行の結合: スキーマが同じ2つ以上のデータ集合の行を結合します。設定項目に関する詳細な説明は、行の結合をご参照ください。
- 集計: 選択されたフィールドと行で計算(平均、合計、最大、最小)を行い、結果値で新たなフィールドを作成します。設定項目に関する詳細な説明は、集計をご参照ください。
- プロパティ名の変更: データで特定のプロパティキーの名前を変えます。設定項目に関する詳細な説明は、プロパティ名の変更をご参照ください。
- 重複の削除: データソースで重複したデータ行を削除します。設定項目に関する詳細な説明は、重複の削除をご参照ください。
- 欠損値の穴埋め: データで欠損した列の値を設定した値で埋め尽くします。設定項目に関する詳細な説明は、欠損値の穴埋めをご参照ください。
- SQLクエリ: SQL Select文を利用して変換対象カラムを選択します。設定項目に関する詳細な説明は、SQLクエリをご参照ください。
- 8.で追加した変換ノードを選択し、右画面で変換ノードのプロパティ情報と詳細設定を入力します。
- 入力項目に関する詳細な説明は、変換ノードの構成をご参照ください。
- 追加できる変換ノード数はタスクごとに1つです。
- タスクエディタ画面で [ターゲット] ボタンをクリックして表示されるメニューで Object Storageまたは Data Catalogを選択します。
- Object Storage: NAVERクラウドプラットフォームの Object Storageバケットをデータの保存場所として指定
- Data Catalog: NAVERクラウドプラットフォームの Data Catalogをデータの保存場所として指定
- JDBC: NAVERクラウドプラットフォームの Data Catalogの JDBCコネクションをデータソースとして指定
- 10.で追加したターゲットノードを選択し、右画面でターゲットノードのプロパティ情報を入力します。
- 入力項目に関する詳細な説明は、ターゲットノードの構成をご参照ください。
- カラムのプレビューで設定したスキーマを確認します。
- タスクエディタ画面で [完了] ボタンをクリックします。
- タスクの作成が完了し、タスクリスト画面に切り替わります。
- タスクリストに上記で作成したタスクが追加されます。
- 作成したタスクは NAVERクラウドプラットフォームのリソースとして登録されます。詳細は、Resource Manager とはをご参照ください。
Script Modeでのタスク作成
Script Modeの新規タスクを作成する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- [タスク作成] ボタンをクリックします。
- Script Modeを選択し、 [タスク作成] ボタンをクリックします。
- エディタ画面が表示されたら、 [スクリプト] タブで pysparkコードを直接作成します。
- エディタ画面で [完了] ボタンをクリックします。
- タスクの作成が完了し、タスクリスト画面に切り替わります。
- タスクリストに上記で作成したタスクが追加されます。
- 作成したタスクは NAVERクラウドプラットフォームのリソースとして登録されます。詳細は、Resource Manager とはをご参照ください。
- タスクリストからタスクを選択し、 [実行] ボタンをクリックするか、 [詳細を見る] > [実行] ボタンをクリックするとタスクをオンデマンドで実行できます。
- タスクを予約実行するには、ワークフローを作成してトリガーを接続します。ワークフローの作成の詳細は、ワークフロー作成をご参照ください。
- タスクを作成すると Object Storageにバケットが自動で作成されます。このバケットには当該タスクの実行スクリプトファイルと実行ログファイルが保存されます。
Script Modeで Jobを作成する際、Sparkコードはユーザーが設定した Object Storageスクリプトパスに平文で保存されます。
JDBC URL/ID/パスワードなどは平文で保存されないよう、可能な限り Catalog Connectionをご利用ください。さらに暗号化バケットに保存すると、機密情報はより安全に管理できます。
Script Modeでの Data Catalog Connectionの活用
Script Modeでは Data Catalog JDBC Connectionを使用して DBMSデータの読み取り、書き込みができます。
- DMBS別の使用例
- MySQL
# mysql DB JDBC connectionの使用例 df = spark.read.format('jdbc') .option('catalog_connection_name', 'connection_name') .option('dbtable', 'table_name') .option('driver', 'com.mysql.cj.jdbc.Driver') .load()- PostgreSQL
# mysql DB JDBC connectionの使用例 df = spark.read.format('jdbc') .option('catalog_connection_name', 'data_catalog_connection_name') .option('dbtable', 'schema.table_name') .option('driver', 'org.postgresql.Driver') .load()- MongoDB
# mysql DB JDBC connectionの使用例 df = spark.read.format('jdbc') .option('catalog_connection_name', 'data_catalog_connection_name') .option('collection', 'collection_name') .load()
Script Modeでの Iceberg Table活用
Script Modeで Icebergテーブルタイプを使用できます。以下のサンプルコードをご参照ください
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col, lit, current_timestamp
spark = SparkSession.builder.appName("Iceberg Guide").enableHiveSupport().getOrCreate()
# Data Catalogにデータベースを作成します。
# S3バケットのパスを環境に合わせて変更します。
S3_BUCKET = "test-bucket"
S3_DB_PATH = "s3a://" + S3_BUCKET + "/iceberg_db"
spark.sql("CREATE DATABASE IF NOT EXISTS iceberg_db LOCATION '" + S3_DB_PATH + "'")
spark.sql("USE iceberg_db")
# Icebergテーブル作成
# SQL DDLで作成
S3_TABLE_PATH = "s3a://" + S3_BUCKET + "/iceberg_db/orders"
spark.sql("""
CREATE TABLE IF NOT EXISTS orders (
order_id INT,
customer STRING,
product STRING,
quantity INT,
price DOUBLE,
order_date TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(order_date))
LOCATION '""" + S3_TABLE_PATH + """'
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '3'
)
""")
print("テーブル作成完了: orders")
# データ入力(INSERT)
# 方法 A: SQL INSERT
spark.sql("""
INSERT INTO orders VALUES
(1, 'キム・チョルス', 'ノートパソコン', 1, 1500000.0, timestamp '2026-04-01 10:00:00'),
(2, 'イ・ヨンヒ, 'キーボード', 2, 89000.0, timestamp '2026-04-01 11:30:00'),
(3, 'パク・ミンス', 'モニター', 1, 450000.0, timestamp '2026-04-02 09:00:00'),
(4, 'チェ・ジウン', 'マウス', 3, 35000.0, timestamp '2026-04-02 14:20:00'),
(5, 'ジョン・ハヌル', 'ヘッドセット', 1, 120000.0, timestamp '2026-04-03 16:45:00')
""")
# 方法 B: DataFrame APIで入力
new_data = [
(6, "ハン・ソヨン", "ウェブカメラ", 1, 85000.0, "2026-04-04 10:00:00"),
(7, "オ・ジュンヒョク", "USBハブ", 2, 25000.0, "2026-04-04 13:00:00"),
]
schema = StructType([
StructField("order_id", IntegerType(), False),
StructField("customer", StringType(), False),
StructField("product", StringType(), False),
StructField("quantity", IntegerType(), False),
StructField("price", DoubleType(), False),
StructField("order_date", StringType(), False),
])
df_new = spark.createDataFrame(new_data, schema)
df_new = df_new.withColumn("order_date", col("order_date").cast(TimestampType()))
df_new.writeTo("orders").append()
print(" データ入力完了: 7件")
# データ照会(SELECT)
# 全体照会
print("\n 全注文リスト:")
spark.sql("SELECT * FROM orders ORDER BY order_id").show(truncate=False)
# DataFrame API照会
print(" 10万ウォン以上のご注文")
spark.table("orders") \
.filter(col("price") >= 100000) \
.select("order_id", "customer", "product", "price") \
.orderBy("price", ascending=False) \
.show(truncate=False)
# 集計照会
print(" 顧客ごとの注文総額:")
spark.sql("""
SELECT customer,
COUNT(*) AS order_count,
SUM(price * quantity) AS total_amount
FROM orders
GROUP BY customer
ORDER BY total_amount DESC
""").show(truncate=False)
print(" スナップショット履歴:")
spark.sql("SELECT * FROM spark_catalog.iceberg_db.orders.snapshots").show(truncate=False)
# 特定時点のデータ照会(タイムトラベル)
# spark.sql("SELECT * FROM orders TIMESTAMP AS OF '2026-04-02 00:00:00'").show()
# 特定のスナップショット IDで照会
# spark.sql("SELECT * FROM orders VERSION AS OF <snapshot_id>").show()
# 特定の行を削除
spark.sql("DELETE FROM orders WHERE order_id = 7")
print("order_id=7 削除完了")
# ヒント: MERGE INTO(Upsert)
# 新しいデータが入力された際、既存の行は UPDATE、新規の行は INSERT
upsert_data = [
(1, "キム・チョルス", "ノートパソコン", 1, 1400000.0, "2026-04-01 10:00:00", "払い戻し"), # 既存 → UPDATE
(8, "ユン・ソジュン", "タブレット", 1, 680000.0, "2026-04-05 09:30:00", "準備中"), # 新規 → INSERT
]
df_upsert = spark.createDataFrame(upsert_data, schema.add("status", StringType()))
df_upsert = df_upsert.withColumn("order_date", col("order_date").cast(TimestampType()))
df_upsert.createOrReplaceTempView("incoming_orders")
spark.sql("""
MERGE INTO orders t
USING incoming_orders s
ON t.order_id = s.order_id
WHEN MATCHED THEN
UPDATE SET t.price = s.price, t.status = s.status
WHEN NOT MATCHED THEN
INSERT *
タスクエディタの画面構成
タスクエディタ画面は以下のように構成されています。
タスクエディタ画面は [タスク作成] ボタンをクリックするか、タスクリストで [詳細を見る] ボタンをクリックすると表示されます。

| 領域 | 説明 |
|---|---|
| ① 基本情報 | タスク名を入力 |
| ② 機能タブ | 使用する機能を選択
|
| ③ ノード表示領域 | ソースノード、変換ノード、ターゲットノードを追加。各ノードはボックスとして表現され、ボックスを繋ぐ接続線で上位ノードと下位ノードを表現。 |
| ④ 設定領域 | 各ノードのプロパティ設定。必要時に詳細を設定。ノード別設定項目に関する詳細な説明は、ソースノードの構成、変換ノードの構成、ターゲットノードの構成を参照。 |
| ⑤ トグルボタン | 編集ステータスによって [一時保存] ボタンと [実行] ボタンの間にトグル
|
タスクエディタの [タスク構成] タブのノード表示領域(③番領域)でタスクの構成要素(ソース/変換/ターゲット)ノードを追加した後、タスクエディタの [タスク構成] タブの設定領域(④番領域)でタスク構成要素のプロパティと詳細設定を入力します。
ソースノード、変換ノード、ターゲットノードを1つ以上追加すると、 [完了] ボタンが有効になります。追加できるソースノード数は変換ノードのタスクの種類によって異なります。
ソースノード構成
ソースノード構成を通じて変換するデータのソースノードを指定します。
タスクエディタで [ソース] ノードを追加した後、右画面でプロパティ情報と詳細設定を入力します。
選択できるソースノードには、Object Storageと Data Catalog、JDBC(mysql, postgresql, mongodb)があります。(2025年08月基準)
今後 NAVERクラウドプラットフォームの Cloud DBと顧客企業の On-premiseデータベースの連携をサポートする計画です。
ソースノードのプロパティ情報
ソースノードの種類によってプロパティ情報の入力項目が変わります。
- ソースノードが Object Storageの場合
- 名前: ソースノード名を入力します。
- データストア: Object Storageが選択されています。変更すると入力項目が変更されます。
- バケット: Object Storageの中でタスクを行う元データを含まれたバケットを選択します。
- パス: Object Storageバケットの特定パスを指定します。指定したパスの下位のデータを基準にデータが抽出され、入力しない場合はバケット下位の全パスにあるデータが抽出されます。
- データ形式: 元データのフォーマットを入力します。JSON(NDJSON)、CSV、Parquetの中から選択します。
- ソースノードが Data Catalogの場合
- 名前: ソースノード名を入力します。
- データストア: Data Catalogが選択されています。
- データベース: データベースを選択します。データベースはメタデータを定義したテーブルの集合です。
- テーブル選択: テーブルを選択します。テーブルはデータのスキーマを定義したメタデータを提供します。
- スキーマバージョン: スキーマバージョンを選択します。
- ソースノードが JDBCの場合
- 名前: ソースノードの名前を入力します。
- データストア: JDBCが選択されています。変更すると入力項目が変更されます。
- コネクション: Data Catalogの中で Connectionを選択します。
- テーブル: DBのテーブル名を入力します。
ソースノードの詳細設定
ソースノードの種類によって詳細設定内容が変わります。
- ソースノードが Object Storageの場合: ソースデータとして使用するスキーマテーブルを構成します。
- [追加] ボタンをクリックしてフィールドを追加し、フィールド名とデータタイプを指定します。
- データタイプに関する詳細な説明は、スキーマデータタイプをご参照ください。
- ソースノードが Data Catalogの場合: Data Catalogから読み取ったスキーマテーブルを表示します。
- スキーマテーブルの構成フィールドを追加、または変更できません。特定のプロパティキーを削除できます。
- ソースノードが JDBCの場合: ソースデータとして使用するスキーマテーブルを構成します。
- JDBCを選択した場合、アクセス先でアクセス IPアドレス項目に211.188.48.218が登録されている必要があります。
- アクセス可能な DBMSリスト: MySQL、PostgreSQL、MongoDB
- [追加] ボタンをクリックしてフィールドを追加し、フィールド名とデータタイプを指定します。
- データタイプに関する詳細な説明は、スキーマデータタイプをご参照ください。
変換ノード構成
タスクエディタで [変換] ノードを追加した後、右画面でプロパティ情報と詳細設定を入力してデータの変換を定義します。
変換タスクの種類によって変換設定項目が変わります。タスクタイプ別の設定項目を説明します。
プロパティの定義
ソースデータを使用してターゲットデータのスキーマを定義します。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- [詳細設定] タブ: ソースノードとターゲットノードのスキーマをマッピングします。
- 上位ノードフィールドに表示されたソースノードのプロパティキーと下位ノードフィールドに表示された下位ノードのプロパティキーをマッピングします。
- 下位ノードフィールドはターゲットノードが追加されてからこそ、設定できます。ターゲットノードを追加しない場合、選択値が表示されません。
- データタイプは変更できます。ソースノードのデータタイプをターゲットノードで変更できます。
- データタイプに関する詳細な説明は、スキーマデータタイプをご参照ください。
プロパティ選択
ソースデータのプロパティキーの中でターゲットデータに構成するプロパティを選択します。選択されていないプロパティキーはターゲットデータから除外されます。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- [詳細設定] タブ: 上位ノードのプロパティキーの中で下位ノードに送信するキーを1つ以上選択します。
列の結合
2つのデータ集合の列を結合します。上位ノードを2つまで選択できます。
列を結合した後はデータのスキーマが変更されます。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するノードを2つ指定します。ソースノードが2つ作成される必要があります。
- [詳細設定] タブ: 列結合のルールを設定します。
- タイプ: 内部結合、左結合、右結合、外部結合の中で列の結合タイプを1つ選択します。
- 内部結合: 結合条件を満足する行に対して2つのデータ集合の列を結合。結合条件を満足しない行は結合不可。条件を追加しない場合、2つのデータ集合のすべての行に対して列を結合。
- 左結合: 左のデータ集合の行を基準に列を結合。左のデータ集合のすべての行と結合条件を満足する右のデータ集合の行を含めて列を結合。
- 右結合: 右のデータ集合の行を基準に列を結合。右のデータ集合のすべての行と結合条件を満足する左のデータ集合の行を含めて列を結合。
- 外部結合: 2つのデータ集合のすべての行を含めて列を結合
- 条件: 各データ集合で相互比較するプロパティキーを選択。条件を設定しないこともできます。
- [追加] ボタンをクリックすると左側ノードフィールド/比較演算子/右側ノードフィールドテーブルを作成
- 左側ノードフィールドで左のデータ集合のプロパティキーを選択
- 右側ノードフィールドで右のデータ集合のプロパティキーを入力
- 左側ノードフィールドのプロパティキーと右側ノードフィールドのプロパティキーが同じ場合、その行に対して列を結合
- プレフィックス: 左側ノードのフィールド名と右側ノードのフィールド名は重複できないため、右側ノードのフィールド名に自動でプレフィックスが追加される。この時に付けるプレフィックスの名前を変更。
- タイプ: 内部結合、左結合、右結合、外部結合の中で列の結合タイプを1つ選択します。
フィルタ
ソースデータをフィルタリングしてターゲットデータとして作成します。フィルタ条件を満足しない行はターゲットデータが削除されます。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- [詳細設定] タブ: フィルタ条件を設定します。
- フィルタタイプ: ANDまたは ORを選択します。フィルタが複数の場合、フィルタが結合する方法を決めます。
- 条件: フィルタリング条件を設定します。
- [追加] ボタンをクリックすると、フィールド/条件/値テーブルを作成
- 例: value == 0.7: valueフィールドの値が数値型で0.7の場合、そのフィールドはターゲットデータに追加
- 例: value > Car: valueフィールドの値が文字型で、ASCIIコードの値が条件の先頭文字である「C」以上の場合、そのフィールドはターゲットデータに追加
行の結合
スキーマが同じ2つのソースデータを結合します。行を結合する前に2つのソースデータのスキーマ構造が同じか確認します。
スキーマが同じであるため、結合されたデータの列は結合前と同じで行が追加されます。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードを接続するソースノードを2つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 詳細設定 > タイプ: 行の結合ルールを設定します。
- 全体結合: 重複した行を削除せずにすべての行を結合。重複した行なのか判断する際は大小文字を区分して判断します。
- 重複の削除後に結合: 重複した行を削除したすべての行を結合
集計
ソースデータで選択したフィールドと行で計算(平均、合計、最大、最小)を行い、新たなフィールドを追加して結果値を保存します。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- [詳細設定] タブ: 集計するデータフィールドを選択し、当該行に適用する集計関数と結果フィールドを設定します。
- グループ化の基準: 集計範囲を示す基準フィールドを指定。例) valueフィールドが AAAのデータに対して集計
- 集計条件: 集計関数と結果フィールドを指定
- [追加] ボタンをクリックすると、フィールド/条件/結果フィールドテーブルを作成
- フィールド: 集計を適用するソースデータのプロパティキーを選択
- 条件: 選択した範囲のデータに適用する集計関数を選択。 AVG/SUM/MAX/MIN.
- 結果フィールド: 集計結果を保存する新規フィールド名を指定
プロパティ名の変更
データで特定のプロパティキーの名前を変更します。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- [詳細設定] タブ: ソースノードスキーマから読み取った現在のキー名 / 変更するキー名テーブルからプロパティ名を変更したいプロパティキーの変更するキー名を変更します。
重複の削除
データソースで重複したデータ行を削除します。重複を判断する時は大小文字を区分します。行を削除するため、この変換によってスキーマが変わることはありません。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 詳細設定 > 重複タイプ: 重複の削除オプションを選択します。
- 全体の行が一致すると削除: 全フィールドの値が一致する場合にのみ行を削除。重複した行なのか判断する際は大小文字を区分して判断します。
- 特定フィールドが一致すると削除: 特定のフィールドの値が一致する場合にのみ削除、削除される対象は順序と関係なくランダムに削除します。
欠損値の穴埋め
データで欠損した列の値を設定した値で埋め尽くします。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 詳細設定: 欠損データが存在するプロパティキーを定義し、代替する値を設定します。
- 欠損データ対象キー: 欠損データを存在するプロパティキーだけを残して削除
- 代替値: 欠損データを代替する値を入力
SQLクエリ
SQL Select文を利用して変換対象カラムを選択します。
- [プロパティ情報] タブ: 変換タスクのプロパティを定義します。
- 名前: 変換ノード名を入力します。
- 変換: 変換タスクの種類が選択されています。変更すると入力項目が変更されます。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 詳細設定: SQLクエリ文を作成します。
- SQL変換: クエリ文で使用するテーブルのエイリアスを入力します。
ターゲットノード構成
ターゲットノード構成を通じて変換するデータのターゲットノードを指定します。
タスクエディタで [ターゲット] ノードを追加した後、右画面でプロパティ情報と詳細設定を入力します。
選択できるターゲットノードには Object Storage、Data Catalog、Cloud DB for MySQLがあります。(2024年01月基準)
今後 NAVERクラウドプラットフォームの Cloud DBと顧客企業の On-premiseデータベースの連携をサポートする計画です。
ターゲットノードのプロパティ情報
ターゲットノードの種類によってプロパティ情報の入力項目が変わります。
- ターゲットノードが Object Storageの場合
- 名前: ターゲットノードの名前を入力します。
- データストア: Object Storageが選択されています。変更すると入力項目が変更されます。
- バケット: Object Storageの中で変換データを保存するバケットを選択します。
- Prefix: Object Storageバケットの特定パスを指定します。指定したパス下位に結果データを保存します。
- データ形式: ターゲットデータのフォーマットを入力します。JSON、CSV、Parquetの中で選択します。
- 重複処理オプション: ターゲットパスにデータがある場合、処理方法を選択します。データ追加時に入力したパスにデータを追加し、上書き時にはパスを削除して入力します。無視(アップデートしない) の場合、パスが空の場合のみデータを入力します。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 出力ファイル数: 出力ファイルの数を指定できます。
- ターゲットノードが Data Catalogの場合
- 名前: ターゲットノードの名前を入力します。
- データストア: Data Catalogが選択されています。変更すると入力項目が変更されます。
- データベース: データベースを選択します。データベースはメタデータを定義したテーブルの集合です。
- テーブル選択: 変換ノードを通じて変更されたスキーマを保存するテーブルを選択します。
- スキーマバージョン: スキーマバージョンを選択します。
- 重複処理オプション: ターゲットパス(テーブル)にデータがある場合、処理方法を選択します。データ追加時は入力したパス(テーブル)にデータを追加し、上書き時はパス(テーブル)を削除して再入力します。無視(アップデートしない) の場合、パスが空またはパス(テーブル)がない場合のみデータを入力します。
- 上位ノード: ターゲットノードと接続する変換ノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
- 出力ファイル数: DataCatalogが ObjectStorageタイプの場合、出力ファイルの数を指定できます。
- ターゲットノードが JDBCの場合
- 名前: ターゲットノードの名前を入力します。
- データストア: JDBCが選択されています。変更すると入力項目が変更されます。
- コネクション: Data Catalogの中で Connectionを選択します。
- テーブル: 変換ノードを通じて変更されたスキーマを保存するテーブルを入力します。
- 重複処理オプション: ターゲットテーブルにデータがある場合、処理方法を選択します。データ追加時は入力したテーブルにデータを追加し、上書き時はテーブルを削除して入力します。無視(アップデートしない) の場合、テーブルがない場合のみデータを入力します。
- 上位ノード: 変換ノードと接続するソースノードを1つ指定します。データノードを選択するとソースノードの1つを選択でき、加工ノードを選択すると変換ノードから選択できます。
カラムのプレビュー
ターゲットノードに保存されるデータのスキーマのプレビューができます。
ソースおよびターゲットのサポートタイプは、次の通りです。(2025年8月基準)
String, Binary, Boolean, Date, Timestamp, Byte, Short, Int, Long, Float, Double, Decimal, Array, Map, Struct, Null
一部のタイプの場合、MySQL(Postgres)に移行すると以下のように固定されたタイプに変換されます。
Varchar -> varchar(250), Char -> char(64), Array, Map, Struct, String -> mediumtext
データのプレビュー
ソースと変換ノードで処理されるデータの一部をプレビューできます。
タスクパラメータ
タスクノードに一部の項目でタスク実行時に提供するグローバルパラメータを使用するか、直接パラメータを定義して使用できます。
対応する項目は以下の通りです。(2024年7月基準)
Object Storageソース: パス
Object Storageターゲット: Prefix
JDBCソース/ターゲット: テーブル
タスク実行オプションの設定
タスク作成後にタスク実行オプションを設定できます。タスク実行オプションを設定する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- タスクリストから特定のタスクを選択した後、 [実行] ボタンをクリックします。
- 実行オプションのポップアップが表示されたら、実行オプションを設定します。
- 実行コンテナ: 分散してタスクを行うコンテナを何台使用するか設定
- 再試行回数: タスク失敗時に何回再試行するか設定
- Timeout: タスクを一度実行する時にタスク結果を待つ時間を設定
- スクリプトパス: タスクコマンドスクリプトが保存されるパス。タスク作成時に自動で作成される Object Storageバケットのサブパスに自動で指定。
- 実行ログ: タスク実行履歴が保存されるパス。タスク作成時に自動で作成される Object Storageバケットのサブパスに自動で指定。
- ロール名: タスク実行のための SubAccountロール。
- [実行] ボタンまたは [実行せずにオプション保存] ボタンをクリックします。
- [実行] ボタンをクリックすると、タスクリストから当該タスクのステータスが実行中に変更されます。
ソースノードまたはターゲットノードで Cloud DBを利用する場合、DBサーバのネットワーク環境とユーザー設定が以下の DataFlowアクセス IPアドレスを通じたアクセスを許可するか確認します。
10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
- Server > ACG > ACG設定メニューで Inboundルールに追加
- VPC > Network ACL > ACL Rule > Rule設定メニューで Inboundルールに追加
- Cloud DB for MySQL > DB管理 > DB User管理メニューで DB Userに追加
- 10.%, 172.%, 192.168.%
タスク実行リストを照会
タスク実行履歴を照会する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- タスクリストから特定タスクに対する [詳細を見る] ボタンをクリックします。
- タスクエディタ画面が表示されると [実行リスト] タブをクリックします。
- 最近の1か月間のタスク実行リストを確認できます。タスク実行履歴は90日間保存されます。
- 実行リストで照会できる項目は、次の通りです。
- タスク名(ID) : Jobの作成時にユーザーが入力した Job固有の名前(Job ID)
- 実行ステータス: Jobの実行結果。成功、失敗、実行中、待機の中から1つの値が照会される。
- 実行ログ: [詳細を見る] ボタンをクリックすると、タスク実行履歴ファイルの場所に移動
- コンテナ: タスク実行オプションで設定したコンテナの台数
- トリガー: このタスクに接続されたトリガー(スケジュール)ファイルがある場合に紹介される
- 実行開始日: Jobの実行開始日時。トリガーによって予約実行されるか、オンデマンドが実行された日時。
- 実行終了日: Jobの実行終了日時。トリガーによって予約実行されるか、オンデマンドが実行されて終了した日時。
- 実行準備時間: Job実行が行われるまでの準備時間
- 実行時間: Job実行にかかった時間
- 再試行回数: Job実行の再試行回数
- ワークフローで構成せずにタスクをオンデマンドで単独実行した場合は、Job画面の実行リストでのみ実行履歴が照会されます。
- ワークフローで構成されたタスクは Workflow画面の実行リストでも照会され、Job画面の実行リストでも照会されます。
タスク削除
タスクを削除する方法は、次の通りです。
- NAVERクラウドプラットフォームコンソールの VPC環境で、Menu > Services > Big Data & Analytics > Data Flowメニューを順にクリックします。
- Jobsメニューをクリックします。
- タスクリストから特定のタスクを選択した後、 [削除] ボタンをクリックします。
- タスクリストから当該タスクが削除されます。
- 削除されたタスクを含むワークフローはトリガーによって予約されていても実行されません。
- データファイルは UTF-8エンコードタイプのみをサポートします。
- エンコードが異なる場合、Jobの実行が正常に動作しないことがあります。