ML expert Platform のシナリオ

Prev Next

VPC環境で利用できます。

ML expert Platformは、AI/MLサービスのための全プロセスであるデータ管理/処理から大規模分散学習、そして小型モデルから超巨大 AIモデルまでを網羅するサービングリリースまで、効率的で安定したサービスを提供します。ここでは ML expert Platformを活用して FashionMNISTデータセットを学習する方法をステップごとに説明します。当該ユースケースは分散ノード学習を進め、PVC(Persistent Volume Claim)を使用してデータを保存し、PytorchJobを活用して学習を行います。

1. Workspaceの作成と Projectの作成

2. 学習準備

FashionMNISTデータセットの準備

本データセットは Huggingfaceで提供する FashionMNISTデータセットを基準に作成されました。

データセットは以下のように事前に準備されているものと仮定しています。

  • Data Manager
  • Object Storage, Ncloud Storage
データ管理位置 推奨事項 備考
Data Manager
  • データセットを論理的な単位でバージョニングする必要がある場合
  • Huggingface Interfaceで作成された学習コードを使用する場合
Object Storage, Ncloud Storage
  • データセットの管理が不要な場合
  • Huggingface Interfaceが不要な場合

Data Managerで管理したい場合、 データセットアップロードをご参照ください。

学習データを使用する

学習データを使用する方法は、以下の通りです。

  • Huggingface DataLoaderを利用したリモート読み取り方法
  • 選択したストレージ内のデータセットをコピー後、ストレージ読み取り方法

Huggingface DataLoader基盤のリモート読み取り方法はデータセット読み取りを参照し、以下ではボリュームデータ方法についてのみ説明します。

PVC作成

選択したストレージを ML expert Platform内タスク環境で使用する場合、PersistentVolumeClaim(PVC)を作成してマウントする必要があり、詳細は Volumesをご参照ください。次は ML expert Platformでサポートするストレージ種類別 PVCの作成例です。

高性能ストレージ(DDN)は Read Write Many(RWM)をサポートするため、以下のように構成できます。

#exa-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: exa-pvc
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    storageClassName: { 高性能ストレージ storageClassName } # 高性能ストレージ StorageClass名
    accessModes:
        - ReadWriteMany
    resources:
        requests:
            storage: 10Gi # 作成する高性能ストレージ容量
kubectl -n {namespace} apply -f exa-pvc.yaml

ローカルストレージ(NVMe)は GPUサーバに Boundされているため、ノード数だけ PVC作成が必要です。ローカルストレージ(NVMe)を使用する場合はEmptyDirinitContainerを利用した方法またはEmptyDirと Data Manager DataLoaderを利用した方法をお勧めします。

#local-path-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: local-path-pvc
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    storageClassName: { ローカルストレージ storageClassName } # ローカルストレージ StorageClass名
    accessModes:
        - ReadWriteOnce
    resources:
        requests:
            storage: 10Gi # (デフォルト値) ローカルストレージの場合、動作しない
kubectl -n {namespace} apply -f local-path-pvc.yaml

データダウンロード

ML expert Platformではデータセットのダウンロードのために storage-initializer 画像を提供します。Kubernetes Jobを利用して作成された PVCをマウントして提供された画像を通じてデータセットをダウンロードします。

Jobを利用したダウンロード例は、次の通りです。

Data Managerの使用例

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/mdm-storage-initializer:v0.0.1
          env:
          - name: MLX_APIKEY # (3)!
            value: '{ API Key }' # MLXP API Key
          args:
          - "mlx+data-manager://{ MLX endpoint url }/{workspace}/{dataset}"
          - "/data/dataset" # spec.volueMountsの mountPath内データセット保存のためのパス
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { マウントする PVC名 } # 作成した PVC名を入力 (e.g.exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

Object Storage / Ncloud Storageの使用例

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/kserve/storage-initializer:v0.13.0
          env:
          - name: AWS_ENDPOINT_URL
            value: { S3 Endpoint } # S3 Endpoint
          - name: AWS_ACCESS_KEY_ID
            value: { S3 Access Key }  # S3 Access Key
          - name: AWS_SECRET_ACCESS_KEY
            value: { S3 Secret Key } # S3 Secret Key
          - name: AWS_DEFAULT_REGION
            value: { S3 Region } # S3 Region
          args:
          - "s3://{ Object Storageデータセット URL }"
          - "/data/dataset" # spec.volueMountsの mountPath内データセット保存のためのパス
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { マウントする PVC名 } # 作成した PVC名を入力 (e.g.exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

学習コードの準備

ベースイメージ使用

InfiniBandを使用するには PyTorchJobで使用する画像に libibverbs.so がインストールされている必要があります。必要なライブラリがすべてインストールされているため、ML expert Platformで共有するベースイメージをベースに必要な画像を作成することを勧めします。

ML expert Platformで提供する学習を使用するには、NVIDIA公式 Pytorchベースイメージをベースにしたコード作成が必要です。
以下のサンプルコードは、高性能ストレージ(DDN)の使用に基づいて作成されました。

サンプル学習コード

サンプル学習コードは次の通りです。

# mnist_distributed.py
from __future__ import print_function

import argparse
import os
import time

from torch.utils.tensorboard import SummaryWriter
from torchvision.transforms import transforms
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from mlx.sdk.data import login, load_dataset

WORLD_SIZE = int(os.environ.get("WORLD_SIZE"))
LOCAL_RANK = int(os.environ.get("LOCAL_RANK", 0))
GLOBAL_RANK = int(os.environ.get("RANK"))


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


def train(args, model, device, train_loader, optimizer, epoch, writer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if GLOBAL_RANK == 0 and batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset) // WORLD_SIZE,
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            niter = epoch * len(train_loader) + batch_idx
            writer.add_scalar("loss", loss.item(), niter)


def test(args, model, device, test_loader, writer, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[
                1
            ]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = float(correct) / (len(test_loader.dataset) / WORLD_SIZE)
    if GLOBAL_RANK == 0:
        print("\naccuracy={:.4f}\n".format(accuracy))
        writer.add_scalar("accuracy", accuracy, epoch)

def main():
    parser = argparse.ArgumentParser(description="PyTorch Distributed MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="number of epochs to train (default: 10)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=0.01,
        metavar="LR",
        help="learning rate (default: 0.01)",
    )
    parser.add_argument(
        "--momentum",
        type=float,
        default=0.5,
        metavar="M",
        help="SGD momentum (default: 0.5)",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--checkpoint_path",
        default=f"/data/result/mnist_distributed_{int(time.time())}.pt",
        help="Path to save checkpoint",
    )
    parser.add_argument(
        "--data_path", default="/data/mnist/data", help="Path for training/test data"
    )
    parser.add_argument(
        "--log_path",
        default="/data/log",
        metavar="L",
        help="Directory pathwhere summary logs are stored",
    )
    parser.add_argument(
        "--backend",
        type=str,
        help="Distributed backend",
        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
        default=dist.Backend.NCCL,
    )

    args = parser.parse_args()
    login("{ ML expert Platform API Key }")
    writer = SummaryWriter(args.log_path)
    torch.manual_seed(args.seed)

    print("Using distributed PyTorch with {} backend".format(args.backend))
    dist.init_process_group(backend=args.backend)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    def preprocess(examples):
        """HuggingFace datasets用の正しい transform"""
        if isinstance(examples['image'], list):
            # バッチ処理
            examples['image'] = [transform(img) for img in examples['image']]
            examples['label'] = torch.tensor(examples['label'])
        else:
            # 単一アイテム
            examples['image'] = transform(examples['image'])
            examples['label'] = torch.tensor(examples['label'])
        return examples['image'], examples['label']

    train_dataset = load_dataset(args.data_path, split="train")
    train_dataset.set_transform(preprocess)
    test_dataset = load_dataset(args.data_path, split="test")
    test_dataset.set_transform(preprocess)

    train_loader = DataLoader(
        train_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(train_dataset),
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=args.test_batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(test_dataset),
    )
    model = Net().to(LOCAL_RANK)

    # Wrap the model with DistributedDataParallel if needed.
    model = DDP(model, device_ids=[LOCAL_RANK])

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    for epoch in range(1, args.epochs + 1):
        train(args, model, LOCAL_RANK, train_loader, optimizer, epoch, writer)
        test(args, model, LOCAL_RANK, test_loader, writer, epoch)

    if GLOBAL_RANK == 0:
        dir_name = os.path.dirname(args.checkpoint_path)
        if dir_name:
            os.makedirs(dir_name, exist_ok=True)
        torch.save(model.state_dict(), args.checkpoint_path)
        print(f"Checkpoint saved at {args.checkpoint_path}")

    dist.destroy_process_group()


if __name__ == "__main__":
    main()

サンプル Dockerfile

FROM nvcr.io/nvidia/pytorch:23.03-py3

# tensorboardXのインストール
USER root
RUN pip install --no-cache-dir tensorboardX==2.6.2
RUN mkdir -p /opt/mnist/src

WORKDIR /opt/mnist/src

USER 500:500 # 高性能ストレージを使用するには UID 500、GID 500権限が必要
COPY mnist_distributed.py /opt/mnist/src/mnist_distributed.py

コンテナレジストリへのアクセス情報の設定

コンテナレジストリで ML expert Platformでイメージ Pullをするには、アクセス情報の設定が必要です。

3. 学習開始

Pytorchで公式的に提供する Elastic Launchの使用を推奨し、以下のユースケースは torchrunをベースに高性能ストレージ(DDN)を利用したユースケースです。

PytorchJob作成

分散学習のために、以下のように行います。

# pytorchjob.yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
    name: pytorch-mnist-dist-nccl
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    elasticPolicy:
        rdzvId: mnist
        rdzvBackend: c10d
        minReplicas: 2
        maxReplicas: 2
        nProcPerNode: 8
    runPolicy:
        cleanPodPolicy: None
    pytorchReplicaSpecs:
        Worker:
            replicas: 2
            restartPolicy: OnFailure
            template:
                metadata:
                    annotations:
                        sidecar.istio.io/inject: "false"  # Istio sidecar injectionの無効化は必須
                spec:
                    nodeSelector:
                        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名 
                containers:
                - name: pytorch  # PyTorchJobの container名を必ず pytorchに設定
                   image: examples.com/pytorch-mnist-dist:23.03-py3 # サンプルコードのコンテナイメージ
                   imagePullPolicy: Always
                   securityContext:  # Infinibandの使用のために securityContextが必要です。
                        capabilities:
                            add: ["IPC_LOCK"]
                   command: ["bash", "-c"]
                   args:
                   - >
                     torchrun --nnodes ${PET_NNODES} --nproc_per_node ${PET_NPROC_PER_NODE} --rdzv_id ${PET_RDZV_ID} --rdzv_backend ${PET_RDZV_BACKEND} --rdzv_endpoint ${PET_RDZV_ENDPOINT}
                     /opt/mnist/src/mnist.py --checkpoint_path /data/checkpoints/mnist.pt --log_path /data/logs --data_path /data/dataset
                    env:
                    - name: NCCL_DEBUG
                       value: INFO
                    resources:
                        limits:
                            memory: "1Ti"
                            cpu: 120
                            nvidia.com/gpu: 8
                            rdma/hca_shared_devices_a: 1
                        requests:
                            memory: "8Gi"
                            cpu: 120
                            nvidia.com/gpu: 8
                            rdma/hca_shared_devices_a: 1
                    # shared memory
                    volumeMounts:
                    - mountPath: /dev/shm
                       name: shared-memory
                    - mountPath: "/data"
                       name: storage-volume
                volumes:
                - emptyDir:
                   medium: Memory
                   name: shared-memory
                - name: storage-volume
                   persistentVolumeClaim:
                        claimName: exa-pvc # 以前に作成した高性能ストレージ PVC名

PytorchJob実行

kubectl -n { namespace } apply -f pytorchjob.yaml

4. 状況と結果確認

学習状況と結果を確認する方法は、次の通りです。

Pod Logベースの確認

学習中の Podは kubectl logs コマンドを通じてログを確認できます。

kubectl -n { namespace } logs pytorch-elastic-mnist-nccl-worker-0 pytorch

Tensorboardベースの確認

学習コードで Tensorboardのためのロギングを残していたら ML expert Platformで提供する Tensorboardを通じて情報を確認できます。
サンプルコードの場合、Tensorboardログを/data/logsに保存しています。

学習結果の保存

学習パラメータを保管管理するには、Model Registryを使用できます。
学習パラメータを保存するには、Model Registry SDKを通じて自動的にアップロードするか、Notebookを通じて必要な学習パラメータのみをアップロードして管理できます。