Available in VPC.
Kubeflow Pipelines is a core component of Kubeflow that manages workflows using a container-based architecture. It provides a Python SDK and a web UI for creating, deploying, running, and managing workflows. You can access the web UI through the Pipelines menu in the MLXP interface, and you can install the Python SDK using pip. For more information on Kubeflow Pipelines, refer to the official documentation.
Configuring the development environment and installing the SDK
This section describes how to configure the development environment and how to install the Python SDK.
Setting the pipeline root
The pipeline root is the artifact storage where input/output data, logs, datasets, and other information generated when running a pipeline are stored. Before running a pipeline, you must add several settings to the namespace to ensure that each namespace uses its own isolated artifact storage.
If MLXP members use multiple namespaces and it is acceptable for artifact storage contents to be shared between them, a single artifact storage bucket can be used across namespaces. However, even when using the same artifact storage, each namespace must have identical pipeline root settings for pipelines to run correctly.
For the artifact storage, prepare an object storage service that is compatible with AWS S3.
MLXP does not support the default MinIO artifact storage provided by Kubeflow Pipelines. Pipelines cannot run correctly unless the pipeline root is configured for the namespace.
Once the object storage is ready, fill in the values for the ${} fields in the YAML template below, create a YAML file, and then run the corresponding kubectl apply -f {yaml 파일명} command to complete the configuration.
| Item | Description |
|---|---|
| ${NAMESPACE} | Name of the namespace where the pipeline will run |
| ${BUCKET} | Name of the object storage bucket |
| ${PREFIX} | Path prefix where artifacts will be stored |
| ${STORAGE_ENDPOINT} | Endpoint of the object storage service (e.g., kr.object.ncloudstorage.com) |
| ${ACCESS_KEY} | Access key issued for object storage authentication |
| ${SECRET_ACCESS_KEY} | Secret key issued for object storage authentication |
| ${REGION} | Region of the object storage service (e.g., kr-standard) |
# ConfigMap for storing Argo workflow logs
apiVersion: v1
kind: ConfigMap
metadata:
name: artifact-repositories # Do not change the name of this ConfigMap.
namespace: ${NAMESPACE}
annotations:
workflows.argoproj.io/default-artifact-repository: default-v1
data:
default-v1: |
archiveLogs: true
s3:
bucket: ${BUCKET}
endpoint: ${STORAGE_ENDPOINT}
insecure: false
region: kr-standard
keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
accessKeySecret:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
secretKeySecret:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
---
# ConfigMap for setting up KFP artifacts
apiVersion: v1
kind: ConfigMap
metadata:
name: kfp-launcher # Do not change the name of this ConfigMap.
namespace: ${NAMESPACE}
data:
defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
name: artifact-secret
namespace: ${NAMESPACE}
type: Opaque
stringData:
AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: kfp-poddefault
namespace: ${NAMESPACE}
spec:
selector:
matchLabels:
pipelines.kubeflow.org/v2_component: "true"
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: artifact-secret
key: AWS_SECRET_ACCESS_KEY
- name: AWS_DEFAULT_REGION
value: ${REGION}
- name: AWS_REGION
value: ${REGION}
- name: AWS_S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_ENDPOINT
value: ${STORAGE_ENDPOINT}
- name: S3_USE_HTTPS
value: "true"
- name: S3_VERIFY_SSL
value: "true"
Installing the Pipeline SDK
Pipelines provide a Python SDK. The Python SDK allows you to create components, configure and build pipelines, and also provides features for executing pipelines and managing experiments.
This document explains how to create and run a pipeline through a simple example. For detailed usage of the SDK, refer to the Kubeflow Pipelines SDK API.
The features for executing pipelines and managing experiments through the SDK only work in the Jupyter Notebook environment running on MLXP.
MLXP-compatible versions of kfp (Kubeflow Pipelines SDK) can be found in the reference table below.
| Service | Version | Official Document |
|---|---|---|
| Pipeline Backend | 2.2.0 | |
| Pipeline SDK(kfp) | 2.7.0 | https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/ |
| kfp-kubernetes | 1.2.0 | https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/ |
The Pipeline SDK can be installed using the following command: Since the Jupyter Notebook environment provided by MLXP already includes kfp, you can use the SDK without installing it separately. For this reason, using MLXP's Jupyter Notebook environment is recommended.
pip install kfp
Checking the installed kfp version in Jupyter Notebook
To check the kfp version installed in your Jupyter Notebook environment:
- Access Jupyter Notebook and, on the Launcher page, select Other > Terminal to open a terminal.
- In the terminal, run the following command to check the installed kfp version.
(base) irteam@test-pipeline-1-0:~$ pip show kfp Name: kfp Version: 2.7.0 Summary: Kubeflow Pipelines SDK Home-page: https://github.com/kubeflow/pipelines Author: The Kubeflow Authors Author-email: None License: UNKNOWN Location: /opt/conda/lib/python3.8/site-packages Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions Required-by: - If the output shows a version in the 1.x.x format, you must upgrade the package, or use the latest Jupyter Notebook image:
pip install --upgrade kfp=={MLXP-supported kfp version}
If an older version of the SDK (kfp) is installed in your Jupyter Notebook environment, upgrading the package version is required. The SDK(kfp) 1.x versions of kfp are not supported in MLXP, so be sure to check the version before use.
Running a Pipeline Example
In a local development environment (not MLXP's Jupyter Notebook), you can only perform pipeline compilation. After compiling, upload the generated YAML file to the Pipeline interface in MLXP to create and run the pipeline.
The following example should be run in a Jupyter Notebook environment on MLXP.
Create a Python 3 (ipykernel) Notebook, enter the code below, and execute it.
import kfp
from kfp import dsl
from kfp.dsl import component
@component
def add_op(a:float, b:float) -> float:
print("a + b =", a + b)
return a + b
@component
def log_op(msg:str):
print(msg)
@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
add_task = add_op(a=a, b=b)
log_task = log_op(msg=f"{add_task.output}")
We created two components: add_op, which takes two numbers and adds them, and log_op, which takes a single value as input and prints it to the interface. During this process, we use kfp.dsl.component provided by the Python SDK.
In the Pipeline function, you can define the parameters that will be received as inputs when the pipeline is executed. The received parameters are passed as inputs to the add_op component, and the value returned by add_op is then used as the input to the log_op component. Since the log_op component depends on the result produced by add_op, the execution order of the components is determined automatically.
To run the pipeline defined above, use the following code.
if __name__ == "__main__":
kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
client = kfp.Client()
my_experiment = client.create_experiment(name="Test Experiment")
my_run = client.run_pipeline(
experiment_id=my_experiment.experiment_id,
job_name="test",
pipeline_package_path="test_pipeline.yaml",
params={'b': 30.0}
)
When you look at how the code above works, the first step is to compile the source pipeline function, which generates a file named test_pipeline.yaml. This file contains the entire pipeline workflow defined in YAML format. Next, an Experiment is created and the Pipeline is executed.
When you run the code in Jupyter Notebook, a link appears that allows you to check the Experiment or Run in the MLXP interface. Click the Run details link to view the detailed Run page.
To execute a Pipeline from the Jupyter Notebook environment, you can also use a more simplified version of the code, as shown below.
if __name__ == "__main__":
client = kfp.Client()
my_run = client.create_run_from_pipeline_func(
pipeline, {'a': 30.0, 'b': 40.0},
experiment_name="Test Experiment"
)
Pipeline Cleanup
This section describes how to clean up Pipelines.
Cleaning up Pipeline Runs
If a Run is scheduled using Recurring Run, many Runs may accumulate in the Active state. Runs that are no longer needed or are less important can be moved to the Archived state to reduce the number of Active Runs. A Run can only be deleted when it is in the Archived state.
Deleting a Run removes it from the Run interface, and all workflow and Pod resources created for that Run are also deleted.
Cleaning up Pipeline Run Resources
Kubernetes workflow and Pod resources created by Pipelines are deleted according to the following policy:
- STATUS: Succeeded / Completed Deleted 24 hours (1 day) after completion
- STATUS: Error Deleted 72 hours (3 days) after the error occurs
When a Pipeline runs, a workflow resource is created, and multiple Pods may be created and executed. Even though resources are cleaned up according to the policy above, too many leftover resources may still cause issues. For example, a large number of Pods may make Pod queries inconvenient. If a Pod is bound to a PV (PersistentVolume), the PV may fail to delete properly.
If necessary, delete resources created by the Pipeline by following the instructions below.
Deleting workflow or Pod resources does not delete the Pipeline Run itself. The metadata and Artifacts remain available, so you can still view them in the Run detail interface. However, if the Pod has been deleted, logs will no longer be available in the Run detail interface.
Cleaning up Pipelines Using Kubectl
To view the list of currently running Pipelines, run the kubectl get workflow command:
$ kubectl get workflow -n <namespace>
NAME STATUS AGE
pipeline-d85m9 Succeeded 7m45s
You can check the detailed execution history of the Pipeline through kubectl describe workflow.
$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal WorkflowRunning 9m18s workflow-controller Workflow Running
Normal WorkflowNodeSucceeded 9m8s workflow-controller Succeeded node pipeline-d85m9.add
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9.log
Normal WorkflowNodeSucceeded 8m58s workflow-controller Succeeded node pipeline-d85m9
Normal WorkflowSucceeded 8m58s workflow-controller Workflow completed
There are Pods currently remaining in the Completed state that were created by the Pipeline.
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499 0/2 Completed 0 11m
pipeline-d85m9-2790487078 0/2 Completed 0 11m
Use the kubectl delete workflow command to clean up the Pipeline. When you delete the Workflow resource, the Pods that were created and executed for the Pipeline are also deleted.
$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted
$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response