Available in VPC
Kubeflow Pipelines is a core component of Kubeflow that manages workflows using a container-based architecture. It provides a Python SDK and a web UI for creating, deploying, running, and managing workflows. You can access the web UI from the Pipelines menus in the ML expert Platform interface and install the Python SDK using pip. For more information on Kubeflow Pipelines, refer to the official documentation.
Configuring the development environment and installing the SDK
This section describes how to configure the development environment and how to install the Python SDK.
Setting the pipeline root
The pipeline root is the artifact storage where input/output data, logs, datasets, and other information generated when running a pipeline are stored. Before running a pipeline, you must add several settings to the Project Namespaces to ensure that each Project Namespace uses its own isolated artifact storage.
If ML expert Platform members use multiple Project Namespaces and can share artifact storage contents among them without issue, you can use a single artifact storage bucket across multiple Project Namespaces. However, even when using the same artifact storage, each Project Namespace must have identical pipeline root settings for pipelines to run correctly.
For the artifact storage, prepare an object storage service that is compatible with AWS S3.
ML expert Platform does not support the default MinIO artifact storage provided by Kubeflow Pipelines. Therefore, pipelines cannot run correctly unless the pipeline root is configured for the Project Namespace.
Once the object storage is ready, fill in the values for the ${} fields in the YAML template below, create a YAML file, and then run the corresponding kubectl apply -f {yaml 파일명} command to complete the configuration.
| Item | Description |
|---|---|
| ${NAMESPACE} | Name of the namespace where the pipeline will run |
| ${BUCKET} | Name of the object storage bucket |
| ${PREFIX} | Path prefix where artifacts will be stored |
| ${STORAGE_ENDPOINT} | Endpoint of the object storage service (e.g., kr.ncloudstorage.com) |
| ${ACCESS_KEY} | Access key issued for object storage authentication |
| ${SECRET_ACCESS_KEY} | Secret key issued for object storage authentication |
| ${REGION} | Region of the object storage service (e.g., kr) |
# ConfigMap for storing Argo workflow logs
apiVersion: v1
kind: ConfigMap
metadata:
name: artifact-repositories # Do not change the name of this ConfigMap.
namespace: ${NAMESPACE}
annotations:
workflows.argoproj.io/default-artifact-repository: default-v1
data:
default-v1: |
archiveLogs: true
s3:
bucket: ${BUCKET}
endpoint: ${STORAGE_ENDPOINT}
insecure: false
region: kr-standard
keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
accessKeySecret:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
secretKeySecret:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
---
# ConfigMap for setting up KFP artifacts
apiVersion: v1
kind: ConfigMap
metadata:
name: kfp-launcher # Do not change the name of this ConfigMap.
namespace: ${NAMESPACE}
data:
defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
name: artifact-secret
namespace: ${NAMESPACE}
type: Opaque
stringData:
AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: kfp-poddefault
namespace: ${NAMESPACE}
spec:
selector:
matchLabels:
pipelines.kubeflow.org/v2_component: "true"
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
- name: AWS_DEFAULT_REGION
value: ${REGION}
- name: AWS_REGION
value: ${REGION}
- name: AWS_S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_USE_HTTPS
value: "true"
- name: S3_VERIFY_SSL
value: "true"
Installing the Pipeline SDK
Pipelines provide a Python SDK. The Python SDK allows you to create components, configure and build pipelines, and also provides features for executing pipelines and managing experiments.
This document explains how to create and run a pipeline through a simple example. For detailed usage of the SDK, refer to the Kubeflow Pipelines SDK API.
The features for executing pipelines and managing experiments through the SDK only work in the Jupyter Notebook environment running on ML expert Platform.
For the latest kfp version compatible with ML expert Platform clusters, see the table below.
| Services | Version | Official Document |
|---|---|---|
| Pipeline Backend | 2.2.0 | |
| Pipeline SDK(kfp) | 2.7.0 | https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/ |
| kfp-kubernetes | 1.2.0 | https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/ |
For pipeline execution, we recommend the Jupyter Notebook environment provided by the ML expert Platform.
Because default environment settings vary across the provided images, you must install the mandatory Pipeline SDK packages as indicated below.
Access Jupyter Notebook. On the Launcher page, select Other > Terminal to open a terminal.
In the terminal, run the following command to install the package.
(base) irteam@test-pipeline-1-0:~$ pip install kfp kfp-kubernetes
Checking the installed kfp version in Jupyter Notebook
To check the kfp version installed in your Jupyter Notebook environment:
- On the Jupyter Notebook Launcher page, select Other > Terminal to open a terminal.
- In the terminal, run the following command to check the installed kfp version.
(base) irteam@test-pipeline-1-0:~$ pip show kfp Name: kfp Version: 2.7.0 Summary: Kubeflow Pipelines SDK Home-page: https://github.com/kubeflow/pipelines Author: The Kubeflow Authors Author-email: None License: UNKNOWN Location: /opt/conda/lib/python3.8/site-packages Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions Required-by: - If the output shows a version in the 1.x.x format, you must upgrade the package, or use the latest Jupyter Notebook image:
pip install --upgrade kfp=={MLXP-supported kfp version}
If an older version of the SDK (kfp) is installed in your Jupyter Notebook environment, upgrading the package version is required. SDK (kfp) 1.x is not supported by ML expert Platform, so be sure to check the package version before using it.
Running a Pipeline Example
- In local development environments other than Jupyter Notebook on ML expert Platform, you can only compile pipelines. After compiling, you can upload the generated YAML file from the Pipeline interface in ML expert Platform to create and run the pipeline.
- When using GPU resources in a pipeline, you must add a nodeSelector that specifies the GPU zone assigned to your Workspace. For GPU Zone information, see View available GPU Zones.
The following example code should be run in the Jupyter Notebook environment on ML expert Platform.
Create a Python 3 (ipykernel) Notebook, enter the code below, and execute it.
import kfp
from kfp import dsl
from kfp.dsl import component
from kfp import kubernetes
@component
def add_op(a:float, b:float) -> float:
print("a + b =", a + b)
return a + b
@component
def log_op(msg:str):
print(msg)
@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
add_task = add_op(a=a, b=b)
log_task = log_op(msg=f"{add_task.output}")
# Configuration for GPU resource usage when Private Zone info is ai-infra
kubernetes.add_node_selector(add_task, 'mlx.navercorp.com/zone', 'ai-infra')
add_task.container_spec.image = '{GPU-enabled image}'
add_task.set_accelerator_type('nvidia.com/gpu')
add_task.set_accelerator_limit(4)
We created two components: add_op, which takes two numbers and adds them, and log_op, which takes a single value as input and prints it to the interface. During this process, we use kfp.dsl.component provided by the Python SDK.
In the Pipeline function, you can define the parameters that will be received as inputs when the pipeline is executed. The received parameters are passed as inputs to the add_op component, and the value returned by add_op is then used as the input to the log_op component. Since the log_op component depends on the result produced by add_op, the execution order of the components is determined automatically.
To run the pipeline defined above, use the following code.
if __name__ == "__main__":
kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
client = kfp.Client()
my_experiment = client.create_experiment(name="Test Experiment")
my_run = client.run_pipeline(
experiment_id=my_experiment.experiment_id,
job_name="test",
pipeline_package_path="test_pipeline.yaml",
params={'b': 30.0}
)
When you look at how the code above works, the first step is to compile the pipeline function, which generates a file named test_pipeline.yaml. This file contains the entire pipeline workflow defined in YAML format. Next, an Experiment is created and the Pipeline is executed.
When you run the code in Jupyter Notebook, a link appears that allows you to check the Experiment or Run in the ML expert Platform interface. Click the Run details link to view the detailed Run interface.
To execute a Pipeline from the Jupyter Notebook environment, you can also use a more simplified version of the code, as shown below.
if __name__ == "__main__":
client = kfp.Client()
my_run = client.create_run_from_pipeline_func(
pipeline, {'a': 30.0, 'b': 40.0},
experiment_name="Test Experiment"
)
Pipeline Cleanup
This section describes how to clean up Pipelines.
Cleaning up Pipeline Runs
If a Run is scheduled using Recurring Run, many Runs may accumulate in the Active state. Runs that are no longer needed or are less important can be moved to the Archived state to reduce the number of Active Runs. A Run can only be deleted when it is in the Archived state.
Deleting a Run removes it from the Run interface, and the workflow and Pod resources created for that Run in the Project Namespace are also deleted.
Cleaning up Pipeline Run Resources
Kubernetes workflow and Pod resources created by Pipelines are deleted according to the following policy:
- STATUS: Succeeded / Completed Deleted 24 hours (1 day) after completion
- STATUS: Error Deleted 72 hours (3 days) after the error occurs
When a Pipeline runs, a workflow resource is created, and multiple Pods may be created and executed. Therefore, even if resources are deleted according to the policy above, too many leftover resources may still cause issues. For example, Pods that are not deleted may make Pod queries inconvenient. If those Pods are bound to volumes, the volumes may not be deleted properly.
If necessary, delete resources created by the Pipeline by following the instructions below.
Deleting workflow or Pod resources does not delete the Pipeline Run itself. The metadata and Artifacts remain available, so you can still view them in the Run detail interface. However, if the Pod has been deleted, logs will no longer be available in the Run detail interface.
Cleaning up Pipelines Using Kubectl
To view the list of currently running Pipelines, run the kubectl get workflow command:
$ kubectl get workflow -n <namespace>
NAME STATUS AGE
pipeline-d85m9 Succeeded 7m45s
You can check the detailed execution history of the Pipeline through kubectl describe workflow.
$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal WorkflowRunning 9m18s workflow-controller Workflow Running
Normal WorkflowNodeSucceeded 9m8s workflow-controller Succeeded node pipeline-d85m9.add
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9.log
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9
Normal WorkflowSucceeded 8m58s workflow-controller Workflow completed
There are Pods currently remaining in the Completed state that were created by the Pipeline.
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499 0/2 Completed 0 11m
pipeline-d85m9-2790487078 0/2 Completed 0 11m
Use the kubectl delete workflow command to clean up the Pipeline. When you delete the Workflow resource, the Pods that were created and executed for the Pipeline are also deleted.
$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response