ML expert Platform 시나리오

Prev Next

VPC 환경에서 이용 가능합니다.

ML expert Platform은 AI/ML 서비스를 위한 전 과정인 데이터 관리/처리부터 대규모 분산 학습, 그리고 작은 모델부터 초거대 AI 모델을 아우르는 서빙 배포까지 효율적이고 안정적인 서비스를 제공합니다. 여기서는 ML expert Platform을 활용하여 FashionMNIST 데이터셋을 학습하는 방법을 단계별로 설명합니다. 해당 예제는 분산 노드 학습을 진행하며, PVC(Persistent Volume Claim) 을 사용하여 데이터를 저장하고, PytorchJob을 활용하여 학습을 수행합니다.

1. Workspace 생성 및 Project 생성

2. 학습 준비

FashionMNIST 데이터셋 준비

본 데이터셋은 Huggingface 에서 제공하는 FashionMNIST 데이터셋을 기준으로 작성되었습니다.

데이터셋은 아래와 같이 사전에 준비가 되어있는 것을 가정하고 있습니다.

  • Data Manager
  • Object Storage, Ncloud Storage
데이터 관리 위치 권장사항 비고
Data Manager
  • 데이터셋을 논리적인 단위로 버저닝 필요한 경우
  • Huggingface Interface 로 작성된 학습 코드를 사용하는 경우
Object Storage, Ncloud Storage
  • 데이터셋 관리가 필요하지 않은 경우
  • Huggingface Interface가 필요하지 않은 경우

Data Manager 로 관리를 하고 싶은 경우, 데이터셋 업로드를 참고하시기 바랍니다.

학습 데이터 사용

학습 데이터를 사용하는 방법은 아래와 같습니다.

  • Huggingface DataLoader을 이용한 원격 읽기 방식
  • 선택한 스토리지 내 데이터셋을 복사 후 스토리지 읽기 방식

Huggingface DataLoader 기반의 리모트 읽기 방법은 데이터셋 읽기를 참고바라며, 아래에서는 볼륨 데이터 방식에 대해서만 설명합니다.

PVC 생성

선택한 스토리지를 ML expert Platform 내 작업환경에서 사용하려는 경우, PVC(PersistentVolumeClaim)를 생성해서 마운트해야 하며, 자세한 내용은 Volumes를 참고하시기 바랍니다. 다음은 ML expert Platform에서 지원하고 있는 스토리지 종류별 PVC 생성 예시입니다.

고성능 스토리지(DDN)는 Read Write Many (RWM)을 지원하기 때문에 아래와 같이 구성을 할 수 있습니다.

#exa-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: exa-pvc
    namespace: p-{ projectName } # 프로젝트에 해당되는 Kubernetes Namespace 명
spec:
    storageClassName: { 고성능스토리지 storageClassName } # 고성능스토리지 StorageClass 명
    accessModes:
        - ReadWriteMany
    resources:
        requests:
            storage: 10Gi # 생성할 고성능 스토리지 용량
kubectl -n {namespace} apply -f exa-pvc.yaml

로컬 스토리지(NVMe)는 GPU 서버에 Bound 되어 있기 때문에 노드 수 만큼 PVC 생성이 필요합니다. 로컬 스토리지(NVMe)를 사용할 경우에는 EmptyDirinitContainer를 이용한 방식이나 EmptyDir 및 Data Manager DataLoader를 이용한 방식을 권장합니다.

#local-path-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: local-path-pvc
    namespace: p-{ projectName } # 프로젝트에 해당되는 Kubernetes Namespace 명
spec:
    storageClassName: { 로컬스토리지 storageClassName } # 로컬스토리지 StorageClass 명
    accessModes:
        - ReadWriteOnce
    resources:
        requests:
            storage: 10Gi # (기본값) 로컬스토리지에 경우, 작동하지 않음
kubectl -n {namespace} apply -f local-path-pvc.yaml

데이터 다운로드

ML expert Platform 에서는 데이터셋 다운로드를 위해 storage-initializer 이미지를 제공하고 있습니다. Kubernetes Job을 이용해 생성된 PVC를 마운트하고 제공된 이미지를 통해 데이터셋을 다운로드합니다.

Job을 이용한 다운로드 예시는 다음과 같습니다.

Data Manager 사용 예시

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # 프로젝트에 해당되는 Kubernetes Namespace 명
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 제공된 GPU Zone 이름 } # GPU Resources 에서 확인 가능한 Zone 명
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/mdm-storage-initializer:v0.0.1
          env:
          - name: MLX_APIKEY # (3)!
            value: '{ API Key }' # MLXP API Key
          args:
          - "mlx+data-manager://{ MLX endpoint url }/{workspace}/{dataset}"
          - "/data/dataset" # spec.volueMounts의 mountPath 내 데이터셋 저장을 위한 경로
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { 마운트할 PVC 명 } # 생성한 PVC 명 입력 (e.g. exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

Object Storage / Ncloud Storage 사용 예시

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # 프로젝트에 해당되는 Kubernetes Namespace 명
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 제공된 GPU Zone 이름 } # GPU Resources 에서 확인 가능한 Zone 명
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/kserve/storage-initializer:v0.13.0
          env:
          - name: AWS_ENDPOINT_URL
            value: { S3 Endpoint } # S3 Endpoint
          - name: AWS_ACCESS_KEY_ID
            value: { S3 Access Key }  # S3 Access Key
          - name: AWS_SECRET_ACCESS_KEY
            value: { S3 Secret Key } # S3 Secret Key
          - name: AWS_DEFAULT_REGION
            value: { S3 Region } # S3 Region
          args:
          - "s3://{ Object Storage 데이터셋 URL }"
          - "/data/dataset" # spec.volueMounts의 mountPath 내 데이터셋 저장을 위한 경로
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { 마운트할 PVC 명 } # 생성한 PVC 명 입력 (e.g. exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

학습 코드 준비

베이스이미지 사용

InfiniBand를 사용하기 위해선 PyTorchJob 에서 사용하는 이미지에 libibverbs.so 가 설치되어있어야 합니다. 필요한 라이브러리가 모두 설치되어 있기 때문에, ML expert Platform에서 공유하고 있는 베이스 이미지를 기반으로 필요한 이미지를 만드는 것을 권장합니다.

ML expert Platform 에서 제공하는 학습을 사용하기 위해서는 NVIDIA 공식 Pytorch 베이스 이미지를 기반으로 코드 작성이 필요합니다.
아래 예제 코드는 고성능 스토리지(DDN) 사용 기반으로 작성되었습니다.

예제 학습 코드

예제 학습 코드는 다음과 같습니다.

# mnist_distributed.py
from __future__ import print_function

import argparse
import os
import time

from torch.utils.tensorboard import SummaryWriter
from torchvision.transforms import transforms
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from mlx.sdk.data import login, load_dataset

WORLD_SIZE = int(os.environ.get("WORLD_SIZE"))
LOCAL_RANK = int(os.environ.get("LOCAL_RANK", 0))
GLOBAL_RANK = int(os.environ.get("RANK"))


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


def train(args, model, device, train_loader, optimizer, epoch, writer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if GLOBAL_RANK == 0 and batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset) // WORLD_SIZE,
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            niter = epoch * len(train_loader) + batch_idx
            writer.add_scalar("loss", loss.item(), niter)


def test(args, model, device, test_loader, writer, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[
                1
            ]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = float(correct) / (len(test_loader.dataset) / WORLD_SIZE)
    if GLOBAL_RANK == 0:
        print("\naccuracy={:.4f}\n".format(accuracy))
        writer.add_scalar("accuracy", accuracy, epoch)

def main():
    parser = argparse.ArgumentParser(description="PyTorch Distributed MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="number of epochs to train (default: 10)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=0.01,
        metavar="LR",
        help="learning rate (default: 0.01)",
    )
    parser.add_argument(
        "--momentum",
        type=float,
        default=0.5,
        metavar="M",
        help="SGD momentum (default: 0.5)",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--checkpoint_path",
        default=f"/data/result/mnist_distributed_{int(time.time())}.pt",
        help="Path to save checkpoint",
    )
    parser.add_argument(
        "--data_path", default="/data/mnist/data", help="Path for training/test data"
    )
    parser.add_argument(
        "--log_path",
        default="/data/log",
        metavar="L",
        help="Directory pathwhere summary logs are stored",
    )
    parser.add_argument(
        "--backend",
        type=str,
        help="Distributed backend",
        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
        default=dist.Backend.NCCL,
    )

    args = parser.parse_args()
    login("{ ML expert Platform API Key }")
    writer = SummaryWriter(args.log_path)
    torch.manual_seed(args.seed)

    print("Using distributed PyTorch with {} backend".format(args.backend))
    dist.init_process_group(backend=args.backend)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    def preprocess(examples):
        """HuggingFace datasets용 올바른 transform"""
        if isinstance(examples['image'], list):
            # 배치 처리
            examples['image'] = [transform(img) for img in examples['image']]
            examples['label'] = torch.tensor(examples['label'])
        else:
            # 단일 아이템
            examples['image'] = transform(examples['image'])
            examples['label'] = torch.tensor(examples['label'])
        return examples['image'], examples['label']

    train_dataset = load_dataset(args.data_path, split="train")
    train_dataset.set_transform(preprocess)
    test_dataset = load_dataset(args.data_path, split="test")
    test_dataset.set_transform(preprocess)

    train_loader = DataLoader(
        train_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(train_dataset),
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=args.test_batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(test_dataset),
    )
    model = Net().to(LOCAL_RANK)

    # Wrap the model with DistributedDataParallel if needed.
    model = DDP(model, device_ids=[LOCAL_RANK])

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    for epoch in range(1, args.epochs + 1):
        train(args, model, LOCAL_RANK, train_loader, optimizer, epoch, writer)
        test(args, model, LOCAL_RANK, test_loader, writer, epoch)

    if GLOBAL_RANK == 0:
        dir_name = os.path.dirname(args.checkpoint_path)
        if dir_name:
            os.makedirs(dir_name, exist_ok=True)
        torch.save(model.state_dict(), args.checkpoint_path)
        print(f"Checkpoint saved at {args.checkpoint_path}")

    dist.destroy_process_group()


if __name__ == "__main__":
    main()

예제 Dockerfile

FROM nvcr.io/nvidia/pytorch:23.03-py3

# tensorboardX 설치
USER root
RUN pip install --no-cache-dir tensorboardX==2.6.2
RUN mkdir -p /opt/mnist/src

WORKDIR /opt/mnist/src

USER 500:500 # 고성능 스토리지 사용을 위해서는 UID 500, GID 500 권한 필요
COPY mnist_distributed.py /opt/mnist/src/mnist_distributed.py

컨테이너 레지스트리 접속 정보 설정

컨테이너 레지스트리에서 ML expert Platform 에서 이미지 Pull을 하기 위해서는 접속 정보 설정이 필요합니다.

3. 학습 시작

Pytorch 에서 공식적으로 제공하는 Elastic Launch 사용을 권장하고 있으며, 아래의 예제는 torchrun을 기반으로 고성능 스토리지(DDN) 스토리지를 이용한 예제입니다.

PytorchJob 생성

분산학습을 위해 아래와 같이 진행이 필요합니다.

# pytorchjob.yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
    name: pytorch-mnist-dist-nccl
    namespace: p-{ projectName } # 프로젝트에 해당되는 Kubernetes Namespace 명
spec:
    elasticPolicy:
        rdzvId: mnist
        rdzvBackend: c10d
        minReplicas: 2
        maxReplicas: 2
        nProcPerNode: 8
    runPolicy:
        cleanPodPolicy: None
    pytorchReplicaSpecs:
        Worker:
            replicas: 2
            restartPolicy: OnFailure
            template:
                metadata:
                    annotations:
                        sidecar.istio.io/inject: "false"  # Istio sidecar injection 비활성화는 필수
                spec:
                    nodeSelector:
                        mlx.navercorp.com/zone: { 제공된 GPU Zone 이름 } # GPU Resources 에서 확인 가능한 Zone 명 
                containers:
                - name: pytorch  # PyTorchJob의 container 이름은 반드시 pytorch로 설정
                   image: examples.com/pytorch-mnist-dist:23.03-py3 # 예제 코드의 컨테이너 이미지
                   imagePullPolicy: Always
                   securityContext:  # Infiniband 를 사용하기 위한 securityContext가 필요합니다.
                        capabilities:
                            add: ["IPC_LOCK"]
                   command: ["bash", "-c"]
                   args:
                   - >
                     torchrun --nnodes ${PET_NNODES} --nproc_per_node ${PET_NPROC_PER_NODE} --rdzv_id ${PET_RDZV_ID} --rdzv_backend ${PET_RDZV_BACKEND} --rdzv_endpoint ${PET_RDZV_ENDPOINT}
                     /opt/mnist/src/mnist.py --checkpoint_path /data/checkpoints/mnist.pt --log_path /data/logs --data_path /data/dataset
                    env:
                    - name: NCCL_DEBUG
                       value: INFO
                    resources:
                        limits:
                            memory: "1Ti"
                            cpu: 120
                            nvidia.com/gpu: 8
                            rdma/hca_shared_devices_a: 1
                        requests:
                            memory: "8Gi"
                            cpu: 120
                            nvidia.com/gpu: 8
                            rdma/hca_shared_devices_a: 1
                    # shared memory
                    volumeMounts:
                    - mountPath: /dev/shm
                       name: shared-memory
                    - mountPath: "/data"
                       name: storage-volume
                volumes:
                - emptyDir:
                   medium: Memory
                   name: shared-memory
                - name: storage-volume
                   persistentVolumeClaim:
                        claimName: exa-pvc # 이전에 생성한 고성능스토리지 PVC 명

PytorchJob 실행

kubectl -n { namespace } apply -f pytorchjob.yaml

4. 현황 및 결과 확인

학습 현황 및 결과를 확인하는 방법은 다음과 같습니다.

Pod Log 기반 확인

학습 중인 Pod은 kubectl logs 명령어를 통해 로그를 확인할 수 있습니다.

kubectl -n { namespace } logs pytorch-elastic-mnist-nccl-worker-0 pytorch

Tensorboard 기반 확인

학습 코드에서 Tensorboard를 위한 로깅을 남기고 있다면 ML expert Platform 에서 제공되는 Tensorboard를 통해 정보를 확인할 수 있습니다.
예제 코드에 경우, Tensorboard 로그를 /data/logs에 저장되고 있습니다.

학습 결과 저장

학습 파라미터를 보관 관리하기 위해서는 Model Registry를 사용할 수 있습니다.
학습 파라미터를 저장하기 위하여 Model Registry SDK를 통해 자동으로 업로드하거나, Notebook을 통해 필요한 학습 파라미터만 업로드하여 관리할 수 있습니다.