Pipelines の事前準備

Prev Next

VPC環境で利用できます。

Kubeflow Pipelineは Kubeflowの主要構成要素として、コンテナベースで workflowを管理する機能を提供します。また、workflowの作成/リリース/実行/管理のために Python SDKと Web UIを提供します。Web UIは MLXP画面にある Pipelinesのメニューからアクセスでき、Python SDKは pipでインストールできます。 Kubeflow Pipelineの詳細については、公式ドキュメントをご参照ください。

開発環境の構成と SDKのインストール

開発環境の構成方法と Python SDKのインストール方法について説明します。

Pipeline root設定

Pipeline rootとは、Pipelineを実行する際に発生する input/output情報、log、datasetなどの情報を保存する Artifactリポジトリを指します。Pipelineを実行する前に、namespaceにいくつかの設定を追加し、namespace単位で分離された Artifactリポジトリを使用する必要があります。

複数の namespaceを使用する MLXP Member間で Artifactリポジトリの内容が共有されても問題ない場合、1つの Artifactリポジトリを複数の namespaceにまたがって使用できます。ただし、同じ Artifactリポジトリであっても、namespaceごとに同じ Pipeline root設定を行う必要があります。そうすることで、Pipelineを正常に実行できます。

この Artifactリポジトリのために、AWS S3と互換性のある object storageを準備してください。

注意

MLXPでは、Kubeflow Pipelineでデフォルトでサポートする minio Artifactリポジトリは使用できません。そのため、namespaceに Pipeline rootを設定しないと、Pipelineを正常に実行できません。

Object storageの使用準備が整ったら、以下の yaml templateで ${} に該当する領域を目的の値で埋めて yamlファイルを作成した後、 kubectl apply -f {yaml 파일명}を実行して設定を完了します。

項目 説明
${NAMESPACE} pipelineを実行する namespace名
${BUCKET} Object storageの bucket名
${PREFIX} bucketに保存されるパス名
${STORAGE_ENDPOINT} Object storageの endpoint(ex.kr.object.ncloudstorage.com)
${ACCESS_KEY} Object storage認証のために発行された access key
${SECRET_ACCESS_KEY} Object storage認証のために発行された secret key
${REGION} Object storageの region(ex.kr-standard)
# argo workflow logを保存するための configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories # この configmapの nameは変更しないでください。
  namespace: ${NAMESPACE}
  annotations:
    workflows.argoproj.io/default-artifact-repository: default-v1
data:
  default-v1: |
    archiveLogs: true
    s3:
      bucket: ${BUCKET}
      endpoint: ${STORAGE_ENDPOINT}
      insecure: false
      region: kr-standard
      keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
      accessKeySecret:
        name: artifact-secret
        key: AWS_ACCESS_KEY_ID
      secretKeySecret:
        name: artifact-secret
        key: AWS_SECRET_ACCESS_KEY
---
# kfp artifact設定のための Configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: kfp-launcher  # この configmapの nameは変更しないでください。
  namespace: ${NAMESPACE}
data:
  defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
  name: artifact-secret
  namespace: ${NAMESPACE}
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
  AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: kfp-poddefault
  namespace: ${NAMESPACE}
spec:
  selector:
    matchLabels:
      pipelines.kubeflow.org/v2_component: "true"
  env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_SECRET_ACCESS_KEY
    - name: AWS_DEFAULT_REGION
      value: ${REGION}
    - name: AWS_REGION
      value: ${REGION}
    - name: AWS_S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_USE_HTTPS
      value: "true"
    - name: S3_VERIFY_SSL
      value: "true"

Pipeline SDKのインストール

Pipelineは Python SDKを提供します。Python SDKは Componentを作成し、Pipelineを構成・ビルドする機能を提供します。また、Pipelineの実行および Experimentの管理機能も提供します。

このドキュメントでは、簡単なユースケースを通じて Pipelineを作成し実行する方法について説明します。SDKの詳細な使用方法については、Kubeflow Pipelines SDK APIをご参照ください。

注意

現在、SDKを通じて Pipelineを実行し Experimentを管理する機能は、MLXPで実行した Jupyter Notebook環境でのみ動作します。

MLXPクラスタと互換性のある kfpの最新バージョンについては、以下の項目をご参照ください。

サービス バージョン 公式ドキュメント
Pipeline Backend 2.2.0
Pipeline SDK(kfp) 2.7.0 https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/
kfp-kubernetes 1.2.0 https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/

Pipeline SDKは以下のコマンドでインストールできます。MLXPで実行する Jupyter Notebook環境には既に kfpがインストールされているため、別途インストールせずに SDKを使用できます。そのため、MLXPの Jupyter Notebook環境の使用をお勧めします。

pip install kfp

Jupyter Notebookにインストールされた kfpバージョンの確認

Jupyter Notebookにインストールされた kfpのバージョンを確認する方法は、次の通りです。

  1. Jupyter Notebookにアクセス後、Launcherページで Other > Terminalを選択し、ターミナルを起動します。
  2. Terminalに以下のコマンドを入力し、インストールされた kfpのバージョンを確認します。
    (base) irteam@test-pipeline-1-0:~$ pip show kfp
    Name: kfp
    Version: 2.7.0
    Summary: Kubeflow Pipelines SDK
    Home-page: https://github.com/kubeflow/pipelines
    Author: The Kubeflow Authors
    Author-email: None
    License: UNKNOWN
    Location: /opt/conda/lib/python3.8/site-packages
    Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions
    Required-by: 
    
  3. Version出力結果が1.x.x形式で表示される場合は、以下の方法でバージョンアップグレードを実行するか、最新の Jupyter Notebookイメージを使用します。
    pip install --upgrade kfp=={MLX対応 kfpバージョン}
    
注意

Jupyter Notebook環境に低バージョンの SDK(kfp)がインストールされている場合、パッケージのバージョンアップグレードが必須です。SDK(kfp) 1.xバージョンは MLXPでサポートしていないため、使用前に必ずパッケージバージョンをご確認ください。

Pipelineのユースケース実行

参考

MLXPの Jupyter Notebook以外のローカル開発環境では、コンパイルまでのみ実行できます。コンパイルして作成された yamlファイルを MLXPの Pipeline画面で uploadし、Pipelineを作成して実行できます。

次のサンプルコードは、MLXPで実行した Jupyter Notebook環境で行います。

Python 3(ipykernel) Notebookを作成し、以下のコードを入力して実行してください。

import kfp
from kfp import dsl
from kfp.dsl import component

@component
def add_op(a:float, b:float) -> float:
    print("a + b =", a + b)
    return a + b

@component
def log_op(msg:str):
    print(msg)


@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
    add_task = add_op(a=a, b=b)
    log_task = log_op(msg=f"{add_task.output}")

2つの数値を受け取って加算する add_opコンポーネントと、1つの値を入力として受け取って画面に出力する log_opコンポーネントを作成しました。このプロセスでは、Python SDKが提供する kfp.dsl.component を活用します。

Pipeline関数では、パイプライン実行時に入力として受け取れる引数を定義できます。受け取った引数を add_opコンポーネントの入力として使用し、add_opコンポーネントが返した値を log_opコンポーネントの入力として使用します。log_opコンポーネントが add_opコンポーネントの結果を使用するため、コンポーネントの実行順序は自動的に決定されます。

上記で定義したパイプラインを実行するために、以下のコードを使用します。

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
    client = kfp.Client()
    my_experiment = client.create_experiment(name="Test Experiment")
    my_run = client.run_pipeline(
        experiment_id=my_experiment.experiment_id,
        job_name="test",
        pipeline_package_path="test_pipeline.yaml",
        params={'b': 30.0}
    )

上記コードの動作を確認してみましょう。まず pipeline 関数をコンパイルして test_pipeline.yaml というファイルを作成します。作成されたファイルには、Pipelineの Workflow全体を YAML形式で定義したファイルが含まれています。続いて Experimentを作成し、Pipelineを実行します。

Jupyter Notebookで上記のコードを実行すると、MLXP画面で Experimentや Runを確認できるリンクが作成されます。Run details リンクをクリックすると、Runの詳細画面を確認できます。

Jupyter Notebook環境で Pipelineを実行するために、以下のようにより簡略化されたコードを使用することもできます。

if __name__ == "__main__":
    client = kfp.Client()
    my_run = client.create_run_from_pipeline_func(
        pipeline, {'a': 30.0, 'b': 40.0},
        experiment_name="Test Experiment"
    )

Pipelineのクリーンアップ

Pipelineをクリーンアップする方法について説明します。

Pipeline Runのクリーンアップ

Recurring Run機能により Runが定期的に実行される場合、多くの Runが Activeステータスで蓄積される可能性があります。不要になった、または重要でない Runは Archivedステータスに変更することで、Active Runの数を減らすことができます。Runの削除は Archivedステータスの場合にのみ可能です。

注意

Runを削除すると Run画面で表示されなくなり、Runのために作成された namespaceの workflowと Podリソースが一緒に削除されます。

Pipeline Runリソースのクリーンアップ

Pipelineによって作成された Kubernetes workflowと Podリソースは、以下のポリシーにより削除されます。

  • 正常に実行された Succeeded/Completed STATUS: 実行完了後、24時間(1日)後に削除
  • 実行中にエラーが発生した STATUS: エラー発生後、72時間(3日)後に削除

1つの Pipelineを実行すると workflowリソースが作成され、複数の Podが作成・実行される可能性があります。したがって、上記のポリシーに従ってリソースが削除されても過剰なリソースが蓄積されている場合、様々な問題が発生する可能性があります。例えば、削除されていない Podによって Podの検索が不便になることがあり、該当する Podが PersistentVolume(PV)にバインドされている場合、PVが適切に削除されないなどの問題が発生することがあります。

必要な場合には、以下の内容を参照して Pipelineで作成されたリソースを削除してください。

注意

Workflowリソースと Podが削除されても、Pipeline Runは削除されません。したがって、実行履歴などの metadataや Artifactは残っているため、Runの詳細画面で当該内容を確認できます。ただし、Podを削除した場合は Runの詳細画面で logを閲覧できません。

Kubectlコマンドを使用した Pipelineのクリーンアップ

現在実行中の Pipelineリストを表示するには kubectl get workflow コマンドを実行します。

$ kubectl get workflow -n <namespace>
NAME             STATUS      AGE
pipeline-d85m9   Succeeded   7m45s

当該 Pipelineの詳細な実行履歴は kubectl describe workflow で確認できます。

$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type    Reason                 Age    From                 Message
----    ------                 ----   ----                 -------
Normal  WorkflowRunning        9m18s  workflow-controller  Workflow Running
Normal  WorkflowNodeSucceeded  9m8s   workflow-controller  Succeeded node pipeline-d85m9.add
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9.log
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9
Normal  WorkflowSucceeded      8m58s  workflow-controller  Workflow completed

現在、Pipelineによって作成された Podが Completed ステータスのまま残っています。

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499                          0/2     Completed   0          11m
pipeline-d85m9-2790487078                          0/2     Completed   0          11m

kubectl delete workflow コマンドを使用して該当する Pipelineをクリーンアップします。Workflowリソースを削除すると、Pipelineのために実行されていた Podも一緒に削除されます。

$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response