Pipelines preparation

Prev Next

Available in VPC.

Kubeflow Pipelines is a core component of Kubeflow that manages workflows using a container-based architecture. It provides a Python SDK and a web UI for creating, deploying, running, and managing workflows. You can access the web UI through the Pipelines menu in the MLXP interface, and you can install the Python SDK using pip. For more information on Kubeflow Pipelines, refer to the official documentation.

Configuring the development environment and installing the SDK

This section describes how to configure the development environment and how to install the Python SDK.

Setting the pipeline root

The pipeline root is the artifact storage where input/output data, logs, datasets, and other information generated when running a pipeline are stored. Before running a pipeline, you must add several settings to the namespace to ensure that each namespace uses its own isolated artifact storage.

If MLXP members use multiple namespaces and it is acceptable for artifact storage contents to be shared between them, a single artifact storage bucket can be used across namespaces. However, even when using the same artifact storage, each namespace must have identical pipeline root settings for pipelines to run correctly.

For the artifact storage, prepare an object storage service that is compatible with AWS S3.

Caution

MLXP does not support the default MinIO artifact storage provided by Kubeflow Pipelines. Pipelines cannot run correctly unless the pipeline root is configured for the namespace.

Once the object storage is ready, fill in the values for the ${} fields in the YAML template below, create a YAML file, and then run the corresponding kubectl apply -f {yaml 파일명} command to complete the configuration.

Item Description
${NAMESPACE} Name of the namespace where the pipeline will run
${BUCKET} Name of the object storage bucket
${PREFIX} Path prefix where artifacts will be stored
${STORAGE_ENDPOINT} Endpoint of the object storage service (e.g., kr.object.ncloudstorage.com)
${ACCESS_KEY} Access key issued for object storage authentication
${SECRET_ACCESS_KEY} Secret key issued for object storage authentication
${REGION} Region of the object storage service (e.g., kr-standard)
# ConfigMap for storing Argo workflow logs
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories # Do not change the name of this ConfigMap.
  namespace: ${NAMESPACE}
  annotations:
    workflows.argoproj.io/default-artifact-repository: default-v1
data:
  default-v1: |
    archiveLogs: true
    s3:
      bucket: ${BUCKET}
      endpoint: ${STORAGE_ENDPOINT}
      insecure: false
      region: kr-standard
      keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
      accessKeySecret:
        name: artifact-secret
        key: AWS_ACCESS_KEY_ID
      secretKeySecret:
        name: artifact-secret
        key: AWS_SECRET_ACCESS_KEY
---
# ConfigMap for setting up KFP artifacts
apiVersion: v1
kind: ConfigMap
metadata:
  name: kfp-launcher  # Do not change the name of this ConfigMap.
  namespace: ${NAMESPACE}
data:
  defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
  name: artifact-secret
  namespace: ${NAMESPACE}
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
  AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: kfp-poddefault
  namespace: ${NAMESPACE}
spec:
  selector:
    matchLabels:
      pipelines.kubeflow.org/v2_component: "true"
  env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_SECRET_ACCESS_KEY
    - name: AWS_DEFAULT_REGION
      value: ${REGION}
    - name: AWS_REGION
      value: ${REGION}
    - name: AWS_S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_USE_HTTPS
      value: "true"
    - name: S3_VERIFY_SSL
      value: "true"

Installing the Pipeline SDK

Pipelines provide a Python SDK. The Python SDK allows you to create components, configure and build pipelines, and also provides features for executing pipelines and managing experiments.

This document explains how to create and run a pipeline through a simple example. For detailed usage of the SDK, refer to the Kubeflow Pipelines SDK API.

Caution

The features for executing pipelines and managing experiments through the SDK only work in the Jupyter Notebook environment running on MLXP.

MLXP-compatible versions of kfp (Kubeflow Pipelines SDK) can be found in the reference table below.

Service Version Official Document
Pipeline Backend 2.2.0
Pipeline SDK(kfp) 2.7.0 https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/
kfp-kubernetes 1.2.0 https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/

The Pipeline SDK can be installed using the following command: Since the Jupyter Notebook environment provided by MLXP already includes kfp, you can use the SDK without installing it separately. For this reason, using MLXP's Jupyter Notebook environment is recommended.

pip install kfp

Checking the installed kfp version in Jupyter Notebook

To check the kfp version installed in your Jupyter Notebook environment:

  1. Access Jupyter Notebook and, on the Launcher page, select Other > Terminal to open a terminal.
  2. In the terminal, run the following command to check the installed kfp version.
    (base) irteam@test-pipeline-1-0:~$ pip show kfp
    Name: kfp
    Version: 2.7.0
    Summary: Kubeflow Pipelines SDK
    Home-page: https://github.com/kubeflow/pipelines
    Author: The Kubeflow Authors
    Author-email: None
    License: UNKNOWN
    Location: /opt/conda/lib/python3.8/site-packages
    Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions
    Required-by: 
    
  3. If the output shows a version in the 1.x.x format, you must upgrade the package, or use the latest Jupyter Notebook image:
    pip install --upgrade kfp=={MLXP-supported kfp version}
    
Caution

If an older version of the SDK (kfp) is installed in your Jupyter Notebook environment, upgrading the package version is required. The SDK(kfp) 1.x versions of kfp are not supported in MLXP, so be sure to check the version before use.

Running a Pipeline Example

Note

In a local development environment (not MLXP's Jupyter Notebook), you can only perform pipeline compilation. After compiling, upload the generated YAML file to the Pipeline interface in MLXP to create and run the pipeline.

The following example should be run in a Jupyter Notebook environment on MLXP.

Create a Python 3 (ipykernel) Notebook, enter the code below, and execute it.

import kfp
from kfp import dsl
from kfp.dsl import component

@component
def add_op(a:float, b:float) -> float:
    print("a + b =", a + b)
    return a + b

@component
def log_op(msg:str):
    print(msg)


@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
    add_task = add_op(a=a, b=b)
    log_task = log_op(msg=f"{add_task.output}")

We created two components: add_op, which takes two numbers and adds them, and log_op, which takes a single value as input and prints it to the interface. During this process, we use kfp.dsl.component provided by the Python SDK.

In the Pipeline function, you can define the parameters that will be received as inputs when the pipeline is executed. The received parameters are passed as inputs to the add_op component, and the value returned by add_op is then used as the input to the log_op component. Since the log_op component depends on the result produced by add_op, the execution order of the components is determined automatically.

To run the pipeline defined above, use the following code.

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
    client = kfp.Client()
    my_experiment = client.create_experiment(name="Test Experiment")
    my_run = client.run_pipeline(
        experiment_id=my_experiment.experiment_id,
        job_name="test",
        pipeline_package_path="test_pipeline.yaml",
        params={'b': 30.0}
    )

When you look at how the code above works, the first step is to compile the source pipeline function, which generates a file named test_pipeline.yaml. This file contains the entire pipeline workflow defined in YAML format. Next, an Experiment is created and the Pipeline is executed.

When you run the code in Jupyter Notebook, a link appears that allows you to check the Experiment or Run in the MLXP interface. Click the Run details link to view the detailed Run page.

To execute a Pipeline from the Jupyter Notebook environment, you can also use a more simplified version of the code, as shown below.

if __name__ == "__main__":
    client = kfp.Client()
    my_run = client.create_run_from_pipeline_func(
        pipeline, {'a': 30.0, 'b': 40.0},
        experiment_name="Test Experiment"
    )

Pipeline Cleanup

This section describes how to clean up Pipelines.

Cleaning up Pipeline Runs

If a Run is scheduled using Recurring Run, many Runs may accumulate in the Active state. Runs that are no longer needed or are less important can be moved to the Archived state to reduce the number of Active Runs. A Run can only be deleted when it is in the Archived state.

Caution

Deleting a Run removes it from the Run interface, and all workflow and Pod resources created for that Run are also deleted.

Cleaning up Pipeline Run Resources

Kubernetes workflow and Pod resources created by Pipelines are deleted according to the following policy:

  • STATUS: Succeeded / Completed Deleted 24 hours (1 day) after completion
  • STATUS: Error Deleted 72 hours (3 days) after the error occurs

When a Pipeline runs, a workflow resource is created, and multiple Pods may be created and executed. Even though resources are cleaned up according to the policy above, too many leftover resources may still cause issues. For example, a large number of Pods may make Pod queries inconvenient. If a Pod is bound to a PV (PersistentVolume), the PV may fail to delete properly.

If necessary, delete resources created by the Pipeline by following the instructions below.

Caution

Deleting workflow or Pod resources does not delete the Pipeline Run itself. The metadata and Artifacts remain available, so you can still view them in the Run detail interface. However, if the Pod has been deleted, logs will no longer be available in the Run detail interface.

Cleaning up Pipelines Using Kubectl

To view the list of currently running Pipelines, run the kubectl get workflow command:

$ kubectl get workflow -n <namespace>
NAME             STATUS      AGE
pipeline-d85m9   Succeeded   7m45s

You can check the detailed execution history of the Pipeline through kubectl describe workflow.

$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type    Reason                 Age    From                 Message
----    ------                 ----   ----                 -------
Normal  WorkflowRunning        9m18s  workflow-controller  Workflow Running
Normal  WorkflowNodeSucceeded  9m8s   workflow-controller  Succeeded node pipeline-d85m9.add
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9.log
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9
Normal  WorkflowSucceeded      8m58s  workflow-controller  Workflow completed

There are Pods currently remaining in the Completed state that were created by the Pipeline.

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499                          0/2     Completed   0          11m
pipeline-d85m9-2790487078                          0/2     Completed   0          11m

Use the kubectl delete workflow command to clean up the Pipeline. When you delete the Workflow resource, the Pods that were created and executed for the Pipeline are also deleted.

$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response