Pipelines の仕様

Prev Next

VPC環境で利用できます。

Kubeflow Pipelineは Kubeflowの主要構成要素として、コンテナベースで workflowを管理する機能を提供します。また、workflowの作成/リリース/実行/管理のために Python SDKと Web UIを提供します。Web UIは ML expert Platform画面にある Pipelinesのメニューからアクセスでき、Python SDKは pipでインストールできます。 Kubeflow Pipelineの詳細については、公式ドキュメントをご参照ください。

開発環境の構成と SDKのインストール

開発環境の構成方法と Python SDKのインストール方法について説明します。

Pipeline root設定

Pipeline rootとは、Pipelineを実行する際に発生する input/output情報、log、datasetなどの情報を保存する Artifactリポジトリを指します。Pipelineを実行する前に、Project Namespaceにいくつかの設定を追加し、Project Namespace単位で分離された Artifactリポジトリを使用する必要があります。

複数の Project Namespaceを使用する ML expert Platform Member間で Artifactリポジトリの内容が共有されても問題ない場合、1つの Artifactリポジトリを複数の Project Namespaceにまたがって使用できます。ただし、同じ Artifactリポジトリであっても、Project Namespaceごとに同じ Pipeline root設定を行う必要があります。そうすることで、Pipelineを正常に実行できます。

この Artifactリポジトリのために、AWS S3と互換性のある object storageを準備してください。

注意

ML expert Platformでは、Kubeflow Pipelineでデフォルトでサポートする minio Artifactリポジトリは使用できません。そのため、Project Namespaceに Pipeline rootを設定しないと、Pipelineを正常に実行できません。

Object storageの使用準備が整ったら、以下の yaml templateで ${} に該当する領域を目的の値で埋めて yamlファイルを作成した後、 kubectl apply -f {yaml 파일명}を実行して設定を完了します。

項目 説明
${NAMESPACE} pipelineを実行する namespace名
${BUCKET} Object storageの bucket名
${PREFIX} bucketに保存されるパス名
${STORAGE_ENDPOINT} Object storageの endpoint(ex. kr.ncloudstorage.com)
${ACCESS_KEY} Object storage認証のために発行された access key
${SECRET_ACCESS_KEY} Object storage認証のために発行された secret key
${REGION} Object storageの region(ex. kr)
# argo workflow logを保存するための configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories # この configmapの nameは変更しないでください。
  namespace: ${NAMESPACE}
  annotations:
    workflows.argoproj.io/default-artifact-repository: default-v1
data:
  default-v1: |
    archiveLogs: true
    s3:
      bucket: ${BUCKET}
      endpoint: ${STORAGE_ENDPOINT}
      insecure: false
      region: kr-standard
      keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
      accessKeySecret:
        name: artifact-secret
        key: AWS_ACCESS_KEY_ID
      secretKeySecret:
        name: artifact-secret
        key: AWS_SECRET_ACCESS_KEY
---
# kfp artifact設定のための Configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: kfp-launcher  # この configmapの nameは変更しないでください。
  namespace: ${NAMESPACE}
data:
  defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
  name: artifact-secret
  namespace: ${NAMESPACE}
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
  AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: kfp-poddefault
  namespace: ${NAMESPACE}
spec:
  selector:
    matchLabels:
      pipelines.kubeflow.org/v2_component: "true"
  env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_SECRET_ACCESS_KEY
    - name: AWS_DEFAULT_REGION
      value: ${REGION}
    - name: AWS_REGION
      value: ${REGION}
    - name: AWS_S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_USE_HTTPS
      value: "true"
    - name: S3_VERIFY_SSL
      value: "true"

Pipeline SDKのインストール

Pipelineは Python SDKを提供します。Python SDKは Componentを作成し、Pipelineを構成・ビルドする機能を提供します。また、Pipelineの実行および Experimentの管理機能も提供します。

このドキュメントでは、簡単なユースケースを通じて Pipelineを作成し実行する方法について説明します。SDKの詳細な使用方法については、Kubeflow Pipelines SDK APIをご参照ください。

注意

現在、SDKを通じて Pipelineを実行し Experimentを管理する機能は、ML expert Platformで実行した Jupyter Notebook環境でのみ動作します。

ML expert Platformクラスタと互換性のある kfpの最新バージョンについては、以下の項目をご参照ください。

サービス バージョン 公式ドキュメント
Pipeline Backend 2.2.0
Pipeline SDK(kfp) 2.7.0 https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/
kfp-kubernetes 1.2.0 https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/

Pipelineの実行環境は、ML expert Platformが提供する Jupyter Notebook環境をお勧めします。
Jupyter Notebook実行のために基本的に提供するイメージの環境設定が異なるため、以下のように Pipeline SDKの必須パッケージインストールを行います。

Jupyter Notebookにアクセス後、Launcherページで Other > Terminalを選択し、ターミナルを起動します。
Terminalに以下のようなコマンドを入力し、パッケージをインストールします。

(base) irteam@test-pipeline-1-0:~$ pip install kfp kfp-kubernetes

Jupyter Notebookにインストールされた kfpバージョンの確認

Jupyter Notebookにインストールされた kfpのバージョンを確認する方法は、次の通りです。

  1. Jupyter Notebookの Launcherページで Other > Terminalを選択し、ターミナルを起動します。
  2. Terminalに以下のコマンドを入力し、インストールされた kfpのバージョンを確認します。
    (base) irteam@test-pipeline-1-0:~$ pip show kfp
    Name: kfp
    Version: 2.7.0
    Summary: Kubeflow Pipelines SDK
    Home-page: https://github.com/kubeflow/pipelines
    Author: The Kubeflow Authors
    Author-email: None
    License: UNKNOWN
    Location: /opt/conda/lib/python3.8/site-packages
    Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions
    Required-by: 
    
  3. Version出力結果が1.x.x形式で表示される場合は、以下の方法でバージョンアップグレードを実行するか、最新の Jupyter Notebookイメージを使用します。
    pip install --upgrade kfp=={MLX対応 kfpバージョン}
    
注意

Jupyter Notebook環境に低バージョンの SDK(kfp)がインストールされている場合、パッケージのバージョンアップグレードが必須です。SDK(kfp) 1.xバージョンは ML expert Platformでサポートしていないため、使用前に必ずパッケージバージョンをご確認ください。

Pipelineのユースケース実行

参考
  • ML expert Platformの Jupyter Notebook以外のローカル開発環境では、コンパイルまでのみ実行できます。コンパイルして作成された yamlファイルを ML expert Platformの Pipeline画面で uploadし、Pipelineを作成して実行できます。
  • Pipelineで GPUリソースを使用する場合、ユーザーの Workspaceが割り当てられた GPU Zoneを定義する nodeSelectorを追加します。GPU Zone情報は、使用できる GPU Zone情報の照会をご参照ください。

次のサンプルコードは、ML expert Platformで実行した Jupyter Notebook環境で行います。
Python 3(ipykernel) Notebookを作成し、以下のコードを入力して実行してください。

import kfp
from kfp import dsl
from kfp.dsl import component
from kfp import kubernetes

@component
def add_op(a:float, b:float) -> float:
    print("a + b =", a + b)
    return a + b

@component
def log_op(msg:str):
    print(msg)


@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
    add_task = add_op(a=a, b=b)
    log_task = log_op(msg=f"{add_task.output}")
    
    # GPUリソース使用時の設定(Private Zone情報が ai-infraの場合)
    kubernetes.add_node_selector(add_task, 'mlx.navercorp.com/zone', 'ai-infra')
    add_task.container_spec.image = '{gpuの使用が可能な image}'    
    add_task.set_accelerator_type('nvidia.com/gpu')
    add_task.set_accelerator_limit(4)

2つの数値を受け取って加算する add_opコンポーネントと、1つの値を入力として受け取って画面に出力する log_opコンポーネントを作成しました。このプロセスでは、Python SDKが提供する kfp.dsl.component を活用します。

Pipeline関数では、パイプライン実行時に入力として受け取れる引数を定義できます。受け取った引数を add_opコンポーネントの入力として使用し、add_opコンポーネントが返した値を log_opコンポーネントの入力として使用します。log_opコンポーネントが add_opコンポーネントの結果を使用するため、コンポーネントの実行順序は自動的に決定されます。

上記で定義したパイプラインを実行するために、以下のコードを使用します。

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
    client = kfp.Client()
    my_experiment = client.create_experiment(name="Test Experiment")
    my_run = client.run_pipeline(
        experiment_id=my_experiment.experiment_id,
        job_name="test",
        pipeline_package_path="test_pipeline.yaml",
        params={'b': 30.0}
    )

上記コードの動作を確認してみましょう。まず pipeline 関数をコンパイルして test_pipeline.yaml というファイルを作成します。作成されたファイルには、Pipelineの Workflow全体を YAML形式で定義したファイルが含まれています。続いて Experimentを作成し、Pipelineを実行します。

Jupyter Notebookで上記のコードを実行すると、ML expert Platform画面で Experimentや Runを確認できるリンクが作成されます。Run details リンクをクリックすると、Runの詳細画面を確認できます。

Jupyter Notebook環境で Pipelineを実行するために、以下のようにより簡略化されたコードを使用することもできます。

if __name__ == "__main__":
    client = kfp.Client()
    my_run = client.create_run_from_pipeline_func(
        pipeline, {'a': 30.0, 'b': 40.0},
        experiment_name="Test Experiment"
    )

Pipelineのクリーンアップ

Pipelineをクリーンアップする方法について説明します。

Pipeline Runのクリーンアップ

Recurring Run機能により Runが定期的に実行される場合、多くの Runが Activeステータスで蓄積される可能性があります。不要になった、または重要でない Runは Archivedステータスに変更することで、Active Runの数を減らすことができます。Runの削除は Archivedステータスの場合にのみ可能です。

注意

Runを削除すると Run画面で表示されなくなり、Runのために作成された Project Namespaceの Workflowと Podリソースが一緒に削除されます。

Pipeline Runリソースのクリーンアップ

Pipelineによって作成された Kubernetes workflowと Podリソースは、以下のポリシーにより削除されます。

  • 正常に実行された Succeeded/Completed STATUS: 実行完了後、24時間(1日)後に削除
  • 実行中にエラーが発生した STATUS: エラー発生後、72時間(3日)後に削除

1つの Pipelineを実行すると workflowリソースが作成され、複数の Podが作成・実行される可能性があります。したがって、上記のポリシーに従ってリソースが削除されても過剰なリソースが蓄積されている場合、様々な問題が発生する可能性があります。例えば、削除されていない Podによって Podの検索が不便になることがあり、該当する Podが Volumeにバインドされている場合、Volumeが適切に削除されないなどの問題が発生することがあります。

必要な場合には、以下の内容を参照して Pipelineで作成されたリソースを削除してください。

注意

Workflowリソースと Podが削除されても、Pipeline Runは削除されません。したがって、実行履歴などの metadataや Artifactは残っているため、Runの詳細画面で当該内容を確認できます。ただし、Podを削除した場合は Runの詳細画面で logを閲覧できません。

Kubectlコマンドを使用した Pipelineのクリーンアップ

現在実行中の Pipelineリストを表示するには kubectl get workflow コマンドを実行します。

$ kubectl get workflow -n <namespace>
NAME             STATUS      AGE
pipeline-d85m9   Succeeded   7m45s

当該 Pipelineの詳細な実行履歴は kubectl describe workflow で確認できます。

$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type    Reason                 Age    From                 Message
----    ------                 ----   ----                 -------
Normal  WorkflowRunning        9m18s  workflow-controller  Workflow Running
Normal  WorkflowNodeSucceeded  9m8s   workflow-controller  Succeeded node pipeline-d85m9.add
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9.log
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9
Normal  WorkflowSucceeded      8m58s  workflow-controller  Workflow completed

現在、Pipelineによって作成された Podが Completed ステータスのまま残っています。

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499                          0/2     Completed   0          11m
pipeline-d85m9-2790487078                          0/2     Completed   0          11m

kubectl delete workflow コマンドを使用して該当する Pipelineをクリーンアップします。Workflowリソースを削除すると、Pipelineのために実行されていた Podも一緒に削除されます。

$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response