VPC環境で利用できます。
Kubeflow Pipelineは Kubeflowの主要構成要素として、コンテナベースで workflowを管理する機能を提供します。また、workflowの作成/リリース/実行/管理のために Python SDKと Web UIを提供します。Web UIは MLXP画面にある Pipelinesのメニューからアクセスでき、Python SDKは pipでインストールできます。 Kubeflow Pipelineの詳細については、公式ドキュメントをご参照ください。
開発環境の構成と SDKのインストール
開発環境の構成方法と Python SDKのインストール方法について説明します。
Pipeline root設定
Pipeline rootとは、Pipelineを実行する際に発生する input/output情報、log、datasetなどの情報を保存する Artifactリポジトリを指します。Pipelineを実行する前に、namespaceにいくつかの設定を追加し、namespace単位で分離された Artifactリポジトリを使用する必要があります。
複数の namespaceを使用する MLXP Member間で Artifactリポジトリの内容が共有されても問題ない場合、1つの Artifactリポジトリを複数の namespaceにまたがって使用できます。ただし、同じ Artifactリポジトリであっても、namespaceごとに同じ Pipeline root設定を行う必要があります。そうすることで、Pipelineを正常に実行できます。
この Artifactリポジトリのために、AWS S3と互換性のある object storageを準備してください。
MLXPでは、Kubeflow Pipelineでデフォルトでサポートする minio Artifactリポジトリは使用できません。そのため、namespaceに Pipeline rootを設定しないと、Pipelineを正常に実行できません。
Object storageの使用準備が整ったら、以下の yaml templateで ${} に該当する領域を目的の値で埋めて yamlファイルを作成した後、 kubectl apply -f {yaml 파일명}を実行して設定を完了します。
| 項目 | 説明 |
|---|---|
| ${NAMESPACE} | pipelineを実行する namespace名 |
| ${BUCKET} | Object storageの bucket名 |
| ${PREFIX} | bucketに保存されるパス名 |
| ${STORAGE_ENDPOINT} | Object storageの endpoint(ex.kr.object.ncloudstorage.com) |
| ${ACCESS_KEY} | Object storage認証のために発行された access key |
| ${SECRET_ACCESS_KEY} | Object storage認証のために発行された secret key |
| ${REGION} | Object storageの region(ex.kr-standard) |
# argo workflow logを保存するための configmap
apiVersion: v1
kind: ConfigMap
metadata:
name: artifact-repositories # この configmapの nameは変更しないでください。
namespace: ${NAMESPACE}
annotations:
workflows.argoproj.io/default-artifact-repository: default-v1
data:
default-v1: |
archiveLogs: true
s3:
bucket: ${BUCKET}
endpoint: ${STORAGE_ENDPOINT}
insecure: false
region: kr-standard
keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
accessKeySecret:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
secretKeySecret:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
---
# kfp artifact設定のための Configmap
apiVersion: v1
kind: ConfigMap
metadata:
name: kfp-launcher # この configmapの nameは変更しないでください。
namespace: ${NAMESPACE}
data:
defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
name: artifact-secret
namespace: ${NAMESPACE}
type: Opaque
stringData:
AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: kfp-poddefault
namespace: ${NAMESPACE}
spec:
selector:
matchLabels:
pipelines.kubeflow.org/v2_component: "true"
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
- name: AWS_DEFAULT_REGION
value: ${REGION}
- name: AWS_REGION
value: ${REGION}
- name: AWS_S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_USE_HTTPS
value: "true"
- name: S3_VERIFY_SSL
value: "true"
Pipeline SDKのインストール
Pipelineは Python SDKを提供します。Python SDKは Componentを作成し、Pipelineを構成・ビルドする機能を提供します。また、Pipelineの実行および Experimentの管理機能も提供します。
このドキュメントでは、簡単なユースケースを通じて Pipelineを作成し実行する方法について説明します。SDKの詳細な使用方法については、Kubeflow Pipelines SDK APIをご参照ください。
現在、SDKを通じて Pipelineを実行し Experimentを管理する機能は、MLXPで実行した Jupyter Notebook環境でのみ動作します。
MLXPクラスタと互換性のある kfpの最新バージョンについては、以下の項目をご参照ください。
| サービス | バージョン | 公式ドキュメント |
|---|---|---|
| Pipeline Backend | 2.2.0 | |
| Pipeline SDK(kfp) | 2.7.0 | https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/ |
| kfp-kubernetes | 1.2.0 | https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/ |
Pipeline SDKは以下のコマンドでインストールできます。MLXPで実行する Jupyter Notebook環境には既に kfpがインストールされているため、別途インストールせずに SDKを使用できます。そのため、MLXPの Jupyter Notebook環境の使用をお勧めします。
pip install kfp
Jupyter Notebookにインストールされた kfpバージョンの確認
Jupyter Notebookにインストールされた kfpのバージョンを確認する方法は、次の通りです。
- Jupyter Notebookにアクセス後、Launcherページで Other > Terminalを選択し、ターミナルを起動します。
- Terminalに以下のコマンドを入力し、インストールされた kfpのバージョンを確認します。
(base) irteam@test-pipeline-1-0:~$ pip show kfp Name: kfp Version: 2.7.0 Summary: Kubeflow Pipelines SDK Home-page: https://github.com/kubeflow/pipelines Author: The Kubeflow Authors Author-email: None License: UNKNOWN Location: /opt/conda/lib/python3.8/site-packages Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions Required-by: - Version出力結果が1.x.x形式で表示される場合は、以下の方法でバージョンアップグレードを実行するか、最新の Jupyter Notebookイメージを使用します。
pip install --upgrade kfp=={MLX対応 kfpバージョン}
Jupyter Notebook環境に低バージョンの SDK(kfp)がインストールされている場合、パッケージのバージョンアップグレードが必須です。SDK(kfp) 1.xバージョンは MLXPでサポートしていないため、使用前に必ずパッケージバージョンをご確認ください。
Pipelineのユースケース実行
MLXPの Jupyter Notebook以外のローカル開発環境では、コンパイルまでのみ実行できます。コンパイルして作成された yamlファイルを MLXPの Pipeline画面で uploadし、Pipelineを作成して実行できます。
次のサンプルコードは、MLXPで実行した Jupyter Notebook環境で行います。
Python 3(ipykernel) Notebookを作成し、以下のコードを入力して実行してください。
import kfp
from kfp import dsl
from kfp.dsl import component
@component
def add_op(a:float, b:float) -> float:
print("a + b =", a + b)
return a + b
@component
def log_op(msg:str):
print(msg)
@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
add_task = add_op(a=a, b=b)
log_task = log_op(msg=f"{add_task.output}")
2つの数値を受け取って加算する add_opコンポーネントと、1つの値を入力として受け取って画面に出力する log_opコンポーネントを作成しました。このプロセスでは、Python SDKが提供する kfp.dsl.component を活用します。
Pipeline関数では、パイプライン実行時に入力として受け取れる引数を定義できます。受け取った引数を add_opコンポーネントの入力として使用し、add_opコンポーネントが返した値を log_opコンポーネントの入力として使用します。log_opコンポーネントが add_opコンポーネントの結果を使用するため、コンポーネントの実行順序は自動的に決定されます。
上記で定義したパイプラインを実行するために、以下のコードを使用します。
if __name__ == "__main__":
kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
client = kfp.Client()
my_experiment = client.create_experiment(name="Test Experiment")
my_run = client.run_pipeline(
experiment_id=my_experiment.experiment_id,
job_name="test",
pipeline_package_path="test_pipeline.yaml",
params={'b': 30.0}
)
上記コードの動作を確認してみましょう。まず pipeline 関数をコンパイルして test_pipeline.yaml というファイルを作成します。作成されたファイルには、Pipelineの Workflow全体を YAML形式で定義したファイルが含まれています。続いて Experimentを作成し、Pipelineを実行します。
Jupyter Notebookで上記のコードを実行すると、MLXP画面で Experimentや Runを確認できるリンクが作成されます。Run details リンクをクリックすると、Runの詳細画面を確認できます。
Jupyter Notebook環境で Pipelineを実行するために、以下のようにより簡略化されたコードを使用することもできます。
if __name__ == "__main__":
client = kfp.Client()
my_run = client.create_run_from_pipeline_func(
pipeline, {'a': 30.0, 'b': 40.0},
experiment_name="Test Experiment"
)
Pipelineのクリーンアップ
Pipelineをクリーンアップする方法について説明します。
Pipeline Runのクリーンアップ
Recurring Run機能により Runが定期的に実行される場合、多くの Runが Activeステータスで蓄積される可能性があります。不要になった、または重要でない Runは Archivedステータスに変更することで、Active Runの数を減らすことができます。Runの削除は Archivedステータスの場合にのみ可能です。
Runを削除すると Run画面で表示されなくなり、Runのために作成された namespaceの workflowと Podリソースが一緒に削除されます。
Pipeline Runリソースのクリーンアップ
Pipelineによって作成された Kubernetes workflowと Podリソースは、以下のポリシーにより削除されます。
- 正常に実行された Succeeded/Completed STATUS: 実行完了後、24時間(1日)後に削除
- 実行中にエラーが発生した STATUS: エラー発生後、72時間(3日)後に削除
1つの Pipelineを実行すると workflowリソースが作成され、複数の Podが作成・実行される可能性があります。したがって、上記のポリシーに従ってリソースが削除されても過剰なリソースが蓄積されている場合、様々な問題が発生する可能性があります。例えば、削除されていない Podによって Podの検索が不便になることがあり、該当する Podが PersistentVolume(PV)にバインドされている場合、PVが適切に削除されないなどの問題が発生することがあります。
必要な場合には、以下の内容を参照して Pipelineで作成されたリソースを削除してください。
Workflowリソースと Podが削除されても、Pipeline Runは削除されません。したがって、実行履歴などの metadataや Artifactは残っているため、Runの詳細画面で当該内容を確認できます。ただし、Podを削除した場合は Runの詳細画面で logを閲覧できません。
Kubectlコマンドを使用した Pipelineのクリーンアップ
現在実行中の Pipelineリストを表示するには kubectl get workflow コマンドを実行します。
$ kubectl get workflow -n <namespace>
NAME STATUS AGE
pipeline-d85m9 Succeeded 7m45s
当該 Pipelineの詳細な実行履歴は kubectl describe workflow で確認できます。
$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal WorkflowRunning 9m18s workflow-controller Workflow Running
Normal WorkflowNodeSucceeded 9m8s workflow-controller Succeeded node pipeline-d85m9.add
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9.log
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9
Normal WorkflowSucceeded 8m58s workflow-controller Workflow completed
現在、Pipelineによって作成された Podが Completed ステータスのまま残っています。
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499 0/2 Completed 0 11m
pipeline-d85m9-2790487078 0/2 Completed 0 11m
kubectl delete workflow コマンドを使用して該当する Pipelineをクリーンアップします。Workflowリソースを削除すると、Pipelineのために実行されていた Podも一緒に削除されます。
$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response