Pipelines 사전 준비

Prev Next

VPC 환경에서 이용 가능합니다.

Kubeflow Pipeline은 Kubeflow의 핵심 구성 요소로서, 컨테이너 기반으로 workflow를 관리하는 기능을 제공합니다. 그리고 workflow를 생성/배포/실행/관리하기 위해서 Python SDK와 Web UI를 제공합니다. Web UI는 MLXP 화면에 있는 Pipelines의 메뉴들을 통해서 접근할 수 있으며, Python SDK는 pip로 설치할 수 있습니다. Kubeflow Pipeline에 대해 더 자세한 내용은 공식 문서를 참조해 주십시오.

개발 환경 구성 및 SDK 설치

개발 환경을 구성하는 방법과 Python SDK를 설치하는 방법을 설명합니다.

Pipeline root 설정

Pipeline root는, Pipeline을 실행할 때 발생하는 input/output 정보, log, dataset 등의 정보를 저장하는 Artifact 저장소를 의미합니다. Pipeline을 실행하기 전에 namespace에 몇 가지 설정을 추가하여, namespace 단위로 격리된 Artifact 저장소를 사용해야 합니다.

여러 namespace를 사용하는 MLXP Member들 사이에서 Artifact 저장소의 내용이 공유되어도 문제가 없다면, 하나의 Artifact 저장소를 여러 namespace에 걸쳐서 사용할 수 있습니다. 하지만 동일한 Artifact 저장소일지라도, namespace마다 동일한 Pipeline root 설정을 해야만 정상적으로 Pipeline을 실행할 수 있습니다.

이 Artifact 저장소를 위해서, AWS S3와 호환이 되는 object storage를 준비해 주시기 바랍니다.

주의

MLXP에서는 Kubeflow Pipeline에서 기본으로 지원하는 minio Artifact 저장소를 사용할 수 없습니다. 따라서 namespace에 Pipeline root 설정을 하지 않으면, Pipeline을 정상적으로 실행할 수 없습니다.

Object storage를 사용할 준비가 완료되었다면, 아래의 yaml template에서 ${} 에 해당하는 영역을 원하는 값으로 채워서 yaml 파일을 생성한 후에, kubectl apply -f {yaml 파일명}을 실행하여 설정을 완료합니다.

항목 설명
${NAMESPACE} pipeline을 실행할 namespace 이름
${BUCKET} Object storage의 bucket 이름
${PREFIX} bucket에 저장될 경로 이름
${STORAGE_ENDPOINT} Object storage의 endpoint (ex. kr.object.ncloudstorage.com)
${ACCESS_KEY} Object storage 인증을 위해서 발급받은 access key
${SECRET_ACCESS_KEY} Object storage 인증을 위해서 발급받은 secret key
${REGION} Object storage의 region (ex. kr-standard)
# argo workflow log 저장을 위한 configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories # 이 configmap의 name은 변경하지 않아야 합니다.
  namespace: ${NAMESPACE}
  annotations:
    workflows.argoproj.io/default-artifact-repository: default-v1
data:
  default-v1: |
    archiveLogs: true
    s3:
      bucket: ${BUCKET}
      endpoint: ${STORAGE_ENDPOINT}
      insecure: false
      region: kr-standard
      keyFormat: "${PREFIX}/artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}"
      accessKeySecret:
        name: artifact-secret
        key: AWS_ACCESS_KEY_ID
      secretKeySecret:
        name: artifact-secret
        key: AWS_SECRET_ACCESS_KEY
---
# kfp artifact 설정을 위한 configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: kfp-launcher  # 이 configmap의 name은 변경하지 않아야 합니다.
  namespace: ${NAMESPACE}
data:
  defaultPipelineRoot: "s3://${BUCKET}/${PREFIX}?region=${REGION}&endpoint=${STORAGE_ENDPOINT}"
---
apiVersion: v1
kind: Secret
metadata:
  name: artifact-secret
  namespace: ${NAMESPACE}
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: ${ACCESS_KEY}
  AWS_SECRET_ACCESS_KEY: ${SECRET_ACCESS_KEY}
---
# PodDefault to inject NCloud Object Storage environment variables into KFP v2 pods
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: kfp-poddefault
  namespace: ${NAMESPACE}
spec:
  selector:
    matchLabels:
      pipelines.kubeflow.org/v2_component: "true"
  env:
    - name: AWS_ACCESS_KEY_ID
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_ACCESS_KEY_ID
    - name: AWS_SECRET_ACCESS_KEY
      valueFrom:
        secretKeyRef:
          name: artifact-secret
          key: AWS_SECRET_ACCESS_KEY
    - name: AWS_DEFAULT_REGION
      value: ${REGION}
    - name: AWS_REGION
      value: ${REGION}
    - name: AWS_S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_ENDPOINT
      value: ${STORAGE_ENDPOINT}
    - name: S3_USE_HTTPS
      value: "true"
    - name: S3_VERIFY_SSL
      value: "true"

Pipeline SDK 설치

Pipeline은 Python SDK를 제공합니다. Python SDK는 Component를 만들고 Pipeline을 구성, 빌드하는 기능을 제공합니다. 그리고 Pipeline 실행 및 Experiment 관리 기능도 제공하고 있습니다.

이 문서에서는 간단한 예제를 통해 Pipeline을 만들고 실행하는 방법을 설명합니다. 자세한 SDK 의 사용법은 Kubeflow Pipelines SDK API 를 참조해 주십시오.

주의

현재 SDK를 통해서 Pipeline을 실행하고 Experiment를 관리하는 기능은 MLXP에서 실행한 Jupyter Notebook 환경에서만 동작합니다.

MLXP 클러스터와 호환되는 kfp 최신 버전은 아래의 항목들을 참고해 주십시오.

서비스 버전 공식 문서
Pipeline Backend 2.2.0
Pipeline SDK(kfp) 2.7.0 https://kubeflow-pipelines.readthedocs.io/en/sdk-2.7.0/
kfp-kubernetes 1.2.0 https://kfp-kubernetes.readthedocs.io/en/kfp-kubernetes-1.2.0/

Pipeline SDK는 아래 명령으로 설치가 가능합니다. MLXP에서 실행하는 Jupyter Notebook 환경에는 이미 kfp가 설치되어 있어서, 별도의 설치 없이 SDK를 사용할 수 있으므로, MLXP의 Jupyter Notebook 환경 사용을 권고합니다.

pip install kfp

Jupyter Notebook 에 설치된 kfp 버전 확인

Jupyter Notebook 에 설치된 kfp 버전을 확인하는 방법은 다음과 같습니다.

  1. Jupyter Notebook에 접속 후, Launcher 페이지에서 Other > Terminal을 선택한 후 터미널을 구동해 주십시오.
  2. Terminal에 다음과 같이 명령어를 입력하여 설치된 kfp 버전을 확인해 주십시오.
    (base) irteam@test-pipeline-1-0:~$ pip show kfp
    Name: kfp
    Version: 2.7.0
    Summary: Kubeflow Pipelines SDK
    Home-page: https://github.com/kubeflow/pipelines
    Author: The Kubeflow Authors
    Author-email: None
    License: UNKNOWN
    Location: /opt/conda/lib/python3.8/site-packages
    Requires: tabulate, protobuf, google-api-core, kfp-pipeline-spec, kfp-server-api, urllib3, kubernetes, google-cloud-storage, PyYAML, requests-toolbelt, docstring-parser, click, google-auth, typing-extensions
    Required-by: 
    
  3. Version 출력 결과가 1.x.x 형식으로 표시된다면, 다음과 같이 버전 업그레이드를 실행하거나, 최신 Jupyter Notebook 이미지를 사용해 주십시오.
    pip install --upgrade kfp=={MLX 지원 kfp 버전}
    
주의

Jupyter Notebook 환경에 낮은 버전의 SDK(kfp)가 설치되어 있다면 패키지 버전 업그레이드가 반드시 필요합니다. SDK(kfp) 1.x 버전은 MLXP에서 지원하지 않기 때문에, 사용 전에 반드시 패키지 버전을 확인하시기 바랍니다.

Pipeline 예제 실행

참고

MLXP의 Jupyter Notebook이 아닌 로컬 개발 환경에서는 컴파일까지만 진행할 수 있습니다. 컴파일하여 생성된 yaml파일을 MLXP의 Pipeline 화면에서 upload하여 Pipeline을 생성하고 실행할 수 있습니다.

다음 예제 코드는 MLXP에서 실행한 Jupyter Notebook 환경에서 진행합니다.

Python 3(ipykernel) Notebook을 생성하여 다음 코드를 입력한 후 실행해 주십시오.

import kfp
from kfp import dsl
from kfp.dsl import component

@component
def add_op(a:float, b:float) -> float:
    print("a + b =", a + b)
    return a + b

@component
def log_op(msg:str):
    print(msg)


@dsl.pipeline(name='A + B', description="pipeline for input parameter")
def pipeline(a: float = 10.0, b: float = 20.0):
    add_task = add_op(a=a, b=b)
    log_task = log_op(msg=f"{add_task.output}")

두 개의 숫자를 받아서 더하는 add_op 컴포넌트와, 하나의 값을 입력으로 받아서 화면에 출력하는 log_op 컴포넌트를 만들었습니다. 이 과정에서 Python SDK에서 제공하는 kfp.dsl.component 를 활용합니다.

Pipeline 함수에서는 파이프라인 실행시 입력 받을 수 있는 인자를 정의할 수 있습니다. 전달받은 인자를 add_op 컴포넌트의 입력으로 사용하고, add_op 컴포넌트가 반환한 값을 log_op 컴포넌트의 입력으로 사용합니다. log_op 컴포넌트가 add_op 컴포넌트의 결과를 사용하기 때문에 컴포넌트의 실행순서는 자동으로 결정됩니다.

위에서 정의한 파이프라인을 실행하기 위해 다음과 같은 코드를 사용합니다.

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(pipeline, 'test_pipeline.yaml')
    client = kfp.Client()
    my_experiment = client.create_experiment(name="Test Experiment")
    my_run = client.run_pipeline(
        experiment_id=my_experiment.experiment_id,
        job_name="test",
        pipeline_package_path="test_pipeline.yaml",
        params={'b': 30.0}
    )

위 코드의 동작을 살펴보면, 먼저 pipeline 함수를 컴파일해서 test_pipeline.yaml 이라는 파일을 생성합니다. 생성된 파일에는 Pipeline의 전체 Workflow를 YAML 형식으로 정의한 파일이 들어있습니다. 이어서 Experiment를 만들고, Pipeline을 실행합니다.

Jupyter Notebook에서 위의 코드를 실행하면 MLXP 화면에서 Experiment나 Run을 확인할 수 있는 링크가 만들어집니다. Run details 링크를 클릭하면 Run에 대한 상세 화면을 확인할 수 있습니다.

Jupyter Notebook 환경에서 Pipeline을 실행하기 위해서, 다음과 같이 좀 더 간략화된 코드를 사용할 수도 있습니다.

if __name__ == "__main__":
    client = kfp.Client()
    my_run = client.create_run_from_pipeline_func(
        pipeline, {'a': 30.0, 'b': 40.0},
        experiment_name="Test Experiment"
    )

Pipeline 정리

Pipeline을 정리하는 방법을 설명합니다.

Pipeline Run 정리

Recurring Run 기능으로 Run이 주기적으로 실행되는 경우에 많은 Run이 Active 상태로 쌓일 수 있습니다. 불필요해졌거나 중요하지 않은 Run은 Archived 상태로 변경하여 Active Run의 개수를 줄일 수 있습니다. Run의 삭제는 Archived 상태인 경우에만 가능합니다.

주의

Run을 삭제하면 더 이상 Run 화면에서 볼 수 없고, Run을 위해서 생성되었던 namespace의 workflow와 Pod 리소스가 함께 삭제됩니다.

Pipeline Run 리소스 정리

Pipeline에 의해 생성된 Kubernetes workflow와 Pod 리소스는 아래의 정책으로 삭제됩니다.

  • 성공적으로 실행된 Succeeded/Completed STATUS: 실행 완료 후 24시간(1일) 후에 삭제
  • 실행 중 에러가 발생한 STATUS: 에러 발생 후 72시간(3일) 후에 삭제

하나의 Pipeline을 실행하면 workflow 리소스가 생성되고, 여러개의 Pod이 생성되어 실행될 수 있습니다. 따라서 위의 정책대로 리소스가 삭제되더라도, 너무 많은 리소스가 쌓여 있다면 여러 문제가 발생할 수 있습니다. 예를 들면, 삭제되지 않은 Pod들로 인해 Pod 조회가 불편할 수도 있고, 해당 Pod이 PersistentVolume (PV)에 바인딩되어 있는 경우 PV가 제대로 삭제되지 않는 등의 문제가 발생할 수 있습니다.

필요한 경우에는 아래 내용을 참고하여 Pipeline으로 생성된 리소스를 삭제해 주시기 바랍니다.

주의

Workflow 리소스와 Pod이 삭제되더라도 Pipeline Run이 같이 삭제되지는 않습니다. 따라서 실행 기록 등의 metadata와 Artifact는 남아 있어서, Run 상세화면에서 해당 내용들을 확인할 수 있습니다. 하지만 Pod을 삭제한 경우에는 Run의 상세화면에서 log를 열람할 수 없습니다.

Kubectl 명령을 사용하여 Pipeline 정리

현재 실행 중인 Pipeline 목록을 보려면 kubectl get workflow 명령을 실행합니다.

$ kubectl get workflow -n <namespace>
NAME             STATUS      AGE
pipeline-d85m9   Succeeded   7m45s

해당 Pipeline의 자세한 실행 내역은 kubectl describe workflow 으로 확인할 수 있습니다.

$ kubectl describe workflow pipeline-d85m9 -n <namespace>
...
Events:
Type    Reason                 Age    From                 Message
----    ------                 ----   ----                 -------
Normal  WorkflowRunning        9m18s  workflow-controller  Workflow Running
Normal  WorkflowNodeSucceeded  9m8s   workflow-controller  Succeeded node pipeline-d85m9.add
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9.log
Normal  WorkflowNodeSucceeded  8m58s  workflow-controller  Succeeded node pipeline-d85m9
Normal  WorkflowSucceeded      8m58s  workflow-controller  Workflow completed

현재 Pipeline에 의해 생성된 Pod들이 Completed 상태로 남아 있습니다.

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
pipeline-d85m9-1301794499                          0/2     Completed   0          11m
pipeline-d85m9-2790487078                          0/2     Completed   0          11m

kubectl delete workflow 명령을 사용하여 해당 Pipeline을 정리합니다. Workflow 리소스를 삭제하면 Pipeline을 위해서 실행되었던 Pod들도 함께 삭제됩니다.

$ kubectl delete workflow pipeline-d85m9 -n <namespace>
workflow.argoproj.io "pipeline-d85m9" deleted

$ kubectl get pods -n <namespace> | grep pipeline-d85m9
# Empty response