ML expert Platform のシナリオ

Prev Next

VPC環境で利用できます。

ML expert Platformは、AI/MLサービスのための全プロセスであるデータ管理/処理から大規模分散学習、そして小型モデルから超巨大 AIモデルまでを網羅するサービングリリースまで、効率的で安定したサービスを提供します。ここでは、ML expert Platformを活用して FashionMNISTデータセットを学習する方法をステップごとに説明します。当該ユースケースでは分散ノードによる学習を行い、Persistent Volume Claim(PVC)を使用してデータを保存し、PytorchJobを用いて学習を実行します。

1. Workspace作成と Project作成

2. 学習準備

FashionMNISTデータセット準備

本データセットは、Huggingfaceで提供する FashionMNISTデータセットに基づいて作成されました。

データセットは、以下のように事前に準備されていることを前提としています。

  • Data Manager
  • Object Storage, Ncloud Storage
データ管理場所 推奨事項 備考
Data Manager
  • データセットを論理的な単位でバージョニングする必要がある場合
  • Huggingface Interfaceで作成した学習コードを使用する場合
Object Storage, Ncloud Storage
  • データセット管理が不要な場合
  • Huggingface Interfaceが不要な場合

Data Managerで管理したい場合、データセットアップロードをご参照ください。

学習データの使用

学習データを使用する方法は、次の通りです。

  • Huggingface DataLoaderによるリモート読み取り方式
  • 選択したストレージ内のデータセットをコピーした後、ストレージを読み取る方式

Huggingface DataLoaderに基づくリモート読み取り方式は、データセット読み取りをご参照ください。以下では、ボリュームデータ方式についてのみ説明します。

PVC作成

選択したストレージを ML expert Platform内のタスク環境で使用したい場合、Persistent Volume Claim(PVC)を作成してマウントする必要があり、詳細は Volumesをご参照ください。以下は、ML expert Platformでサポートしているストレージ種類ごとの PVC作成の例です。

高性能ストレージ(DDN)は Read Write Many(RWM)をサポートするため、以下のように構成できます。

注意
  • 高性能ストレージをマウントして使用する場合、学習イメージ内の UIDおよび GIDは500に設定する必要があります。
  • Podの securityContextfsGroupを設定しないでください。fsGroupが設定されると、Kubernetesがマウントされたボリューム内のすべてのファイルの所有権を再帰的に変更します。そのため、大容量のデータが保存された高性能ストレージの場合は、Podの初期化時間が大幅に長くなる可能性があります。
#exa-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: exa-pvc
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    storageClassName: { 高性能ストレージの storageClassName } # 高性能ストレージの StorageClass名
    accessModes:
        - ReadWriteMany
    resources:
        requests:
            storage: 10Gi # 作成する高性能ストレージの容量
kubectl -n {namespace} apply -f exa-pvc.yaml

ローカルストレージ(NVMe)は、GPUサーバに Boundされているため、ノード数分の PVCを作成する必要があります。ローカルストレージ(NVMe)を使用する場合は、EmptyDirinitContainerを利用した方式や EmptyDirおよび Data Manager DataLoaderを利用した方式をお勧めします。

#local-path-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
    name: local-path-pvc
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    storageClassName: { ローカルストレージの storageClassName } # ローカルストレージの StorageClass名
    accessModes:
        - ReadWriteOnce
    resources:
        requests:
            storage: 10Gi # (デフォルト値) ローカルストレージの場合、動作しない
kubectl -n {namespace} apply -f local-path-pvc.yaml

データダウンロード

ML expert Platformでは、データセットをダウンロードするために storage-initializerのイメージを提供します。Kubernetes Jobを使用して作成した PVCをマウントし、提供されたイメージを通じてデータセットをダウンロードします。

Jobを利用したダウンロードの例は、次の通りです。

Data Managerの使用例

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/mdm-storage-initializer:0.0.5
          env:
          - name: MLX_APIKEY # (3)!
            value: '{ API Key }' # MLXP API Key
          args:
          - "mlx+data-manager://{ MLX endpoint url }/{workspace}/{dataset}"
          - "/data/dataset" # spec.volueMountsの mountPath内にデータセットを保存するためのパス
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { マウントする PVC名 } # 作成した PVC名を入力(e.g. exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

Object Storage / Ncloud Storage使用の例

# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: download-job
  namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
  annotations:
    sidecar.istio.io/inject: "false"
spec:
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
    spec:
      restartPolicy: Never
      nodeSelector:
        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
      containers:
        - name: storage-initializer
          image: mlx-public.kr.ncr.ntruss.com/mlx/kserve/storage-initializer:v0.13.0
          env:
          - name: AWS_ENDPOINT_URL
            value: { S3 Endpoint } # S3 Endpoint
          - name: AWS_ACCESS_KEY_ID
            value: { S3 Access Key }  # S3 Access Key
          - name: AWS_SECRET_ACCESS_KEY
            value: { S3 Secret Key } # S3 Secret Key
          - name: AWS_DEFAULT_REGION
            value: { S3 Region } # S3 Region
          args:
          - "s3://{ Object Storageデータセットの URL }"
          - "/data/dataset" # spec.volueMountsの mountPath内にデータセットを保存するためのパス
          volumeMounts:
            - mountPath: "/data"
              name: storage-volume
      volumes:
        - name: storage-volume
          persistentVolumeClaim:
            claimName: { マウントする PVC名 } # 作成した PVC名を入力(e.g. exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml

学習コードの準備

ベースイメージを使用

InfiniBandを使用するためには PyTorchJobで使用するイメージに libibverbs.soがインストールされている必要があります。必要なライブラリがすべてインストールされているため、ML expert Platformで共有しているベースイメージを基に必要なイメージを作成することをお勧めします。

ML expert Platformで提供する学習を使用するには、NVIDIA公式の Pytorchベースイメージを基にコードを作成する必要があります。
以下のサンプルコードは、高性能ストレージ(DDN)使用を基に作成されました。

サンプルコード

サンプルコードは、次の通りです。

# mnist_distributed.py
from __future__ import print_function

import argparse
import os
import time

from torch.utils.tensorboard import SummaryWriter
from torchvision.transforms import transforms
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

from mlx.sdk.data import login, load_dataset

WORLD_SIZE = int(os.environ.get("WORLD_SIZE"))
LOCAL_RANK = int(os.environ.get("LOCAL_RANK", 0))
GLOBAL_RANK = int(os.environ.get("RANK"))


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)


def train(args, model, device, train_loader, optimizer, epoch, writer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if GLOBAL_RANK == 0 and batch_idx % args.log_interval == 0:
            print(
                "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                    epoch,
                    batch_idx * len(data),
                    len(train_loader.dataset) // WORLD_SIZE,
                    100.0 * batch_idx / len(train_loader),
                    loss.item(),
                )
            )
            niter = epoch * len(train_loader) + batch_idx
            writer.add_scalar("loss", loss.item(), niter)


def test(args, model, device, test_loader, writer, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(
                output, target, reduction="sum"
            ).item()  # sum up batch loss
            pred = output.max(1, keepdim=True)[
                1
            ]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    accuracy = float(correct) / (len(test_loader.dataset) / WORLD_SIZE)
    if GLOBAL_RANK == 0:
        print("\naccuracy={:.4f}\n".format(accuracy))
        writer.add_scalar("accuracy", accuracy, epoch)

def main():
    parser = argparse.ArgumentParser(description="PyTorch Distributed MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--test-batch-size",
        type=int,
        default=1000,
        metavar="N",
        help="input batch size for testing (default: 1000)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="number of epochs to train (default: 10)",
    )
    parser.add_argument(
        "--lr",
        type=float,
        default=0.01,
        metavar="LR",
        help="learning rate (default: 0.01)",
    )
    parser.add_argument(
        "--momentum",
        type=float,
        default=0.5,
        metavar="M",
        help="SGD momentum (default: 0.5)",
    )
    parser.add_argument(
        "--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=10,
        metavar="N",
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--checkpoint_path",
        default=f"/data/result/mnist_distributed_{int(time.time())}.pt",
        help="Path to save checkpoint",
    )
    parser.add_argument(
        "--data_path", default="/data/mnist/data", help="Path for training/test data"
    )
    parser.add_argument(
        "--log_path",
        default="/data/log",
        metavar="L",
        help="Directory pathwhere summary logs are stored",
    )
    parser.add_argument(
        "--backend",
        type=str,
        help="Distributed backend",
        choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
        default=dist.Backend.NCCL,
    )

    args = parser.parse_args()
    login("{ ML expert Platform API Key }")
    writer = SummaryWriter(args.log_path)
    torch.manual_seed(args.seed)

    print("Using distributed PyTorch with {} backend".format(args.backend))
    dist.init_process_group(backend=args.backend)

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    def preprocess(examples):
        """HuggingFace datasets向けの正しい transform"""
        if isinstance(examples['image'], list):
            # 配置処理
            examples['image'] = [transform(img) for img in examples['image']]
            examples['label'] = torch.tensor(examples['label'])
        else:
            # 単一アイテム
            examples['image'] = transform(examples['image'])
            examples['label'] = torch.tensor(examples['label'])
        return examples['image'], examples['label']

    train_dataset = load_dataset(args.data_path, split="train")
    train_dataset.set_transform(preprocess)
    test_dataset = load_dataset(args.data_path, split="test")
    test_dataset.set_transform(preprocess)

    train_loader = DataLoader(
        train_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(train_dataset),
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=args.test_batch_size,
        shuffle=False,
        num_workers=1,
        pin_memory=True,
        sampler=DistributedSampler(test_dataset),
    )
    model = Net().to(LOCAL_RANK)

    # Wrap the model with DistributedDataParallel if needed.
    model = DDP(model, device_ids=[LOCAL_RANK])

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

    for epoch in range(1, args.epochs + 1):
        train(args, model, LOCAL_RANK, train_loader, optimizer, epoch, writer)
        test(args, model, LOCAL_RANK, test_loader, writer, epoch)

    if GLOBAL_RANK == 0:
        dir_name = os.path.dirname(args.checkpoint_path)
        if dir_name:
            os.makedirs(dir_name, exist_ok=True)
        torch.save(model.state_dict(), args.checkpoint_path)
        print(f"Checkpoint saved at {args.checkpoint_path}")

    dist.destroy_process_group()


if __name__ == "__main__":
    main()

Dockerfileのユースケース

FROM nvcr.io/nvidia/pytorch:23.03-py3

# tensorboardXのインストール
USER root
RUN pip install --no-cache-dir tensorboardX==2.6.2
RUN mkdir -p /opt/mnist/src

WORKDIR /opt/mnist/src

USER 500:500 # 高性能ストレージを使用するには、UID 500、GID 500の権限が必要
COPY mnist_distributed.py /opt/mnist/src/mnist_distributed.py

コンテナレジストリ接続情報の設定

コンテナレジストリから ML expert Platformでイメージを Pullするには、接続情報の設定が必要です。

3. 学習開始

Pytorchで公式に提供する Elastic Launchの使用を推奨しており、以下は torchrunを基に高性能ストレージ(DDN)を利用したユースケースです。

PytorchJob作成

分散学習のために、以下の手順で進める必要があります。

# pytorchjob.yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
    name: pytorch-mnist-dist-nccl
    namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
    elasticPolicy:
        rdzvId: mnist
        rdzvBackend: c10d
        minReplicas: 2
        maxReplicas: 2
        nProcPerNode: 8
    runPolicy:
        cleanPodPolicy: None
    pytorchReplicaSpecs:
        Worker:
            replicas: 2
            restartPolicy: OnFailure
            template:
                metadata:
                    annotations:
                        sidecar.istio.io/inject: "false"  # Istio sidecar injectionの無効化は必須
                spec:
                    nodeSelector:
                        mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名 
                containers:
                - name: pytorch  # PyTorchJobの container名は必ず pytorchに設定
                   image: examples.com/pytorch-mnist-dist:23.03-py3 # サンプルコードのコンテナイメージ
                   imagePullPolicy: Always
                   securityContext:  # Infinibandを使用するための securityContextが必要です。
                        capabilities:
                            add: ["IPC_LOCK"]
                   command: ["bash", "-c"]
                   args:
                   - >
                     torchrun --nnodes ${PET_NNODES} --nproc_per_node ${PET_NPROC_PER_NODE} --rdzv_id ${PET_RDZV_ID} --rdzv_backend ${PET_RDZV_BACKEND} --rdzv_endpoint ${PET_RDZV_ENDPOINT}
                     /opt/mnist/src/mnist.py --checkpoint_path /data/checkpoints/mnist.pt --log_path /data/logs --data_path /data/dataset
                    env:
                    - name: NCCL_DEBUG
                       value: INFO
                    resources:
                        limits:
                            memory: "1Ti"
                            cpu: 120
                            nvidia.com/gpu: 8
                        requests:
                            memory: "8Gi"
                            cpu: 120
                            nvidia.com/gpu: 8
                    # shared memory
                    volumeMounts:
                    - mountPath: /dev/shm
                       name: shared-memory
                    - mountPath: "/data"
                       name: storage-volume
                volumes:
                - emptyDir:
                   medium: Memory
                   name: shared-memory
                - name: storage-volume
                   persistentVolumeClaim:
                        claimName: exa-pvc # 以前作成した高性能ストレージの PVC名

PytorchJobを実行

kubectl -n { namespace } apply -f pytorchjob.yaml

4. 状況と結果の確認

学習状況と結果を確認する方法は、次の通りです。

Pod Logに基づく確認

学習中の Podは、kubectl logsコマンドを使用してログを確認できます。

kubectl -n { namespace } logs pytorch-elastic-mnist-nccl-worker-0 pytorch

Tensorboardに基づく確認

学習コードで Tensorboard用のログを出力している場合、ML expert Platformで提供される Tensorboardを通じて情報を確認できます。
サンプルコードの場合、Tensorboardログを /data/logsに保存しています。

学習結果の保存

学習パラメータを保存・管理するために Model Registryを使用できます。
学習パラメータを保存するために、Model Registry SDKを使用して自動的にアップロードするか、Notebookを通じて必要な学習パラメータのみをアップロードして管理できます。