VPC環境で利用できます。
ML expert Platformは、AI/MLサービスのための全プロセスであるデータ管理/処理から大規模分散学習、そして小型モデルから超巨大 AIモデルまでを網羅するサービングリリースまで、効率的で安定したサービスを提供します。ここでは ML expert Platformを活用して FashionMNISTデータセットを学習する方法をステップごとに説明します。当該ユースケースは分散ノード学習を進め、PVC(Persistent Volume Claim)を使用してデータを保存し、PytorchJobを活用して学習を行います。
1. Workspaceの作成と Projectの作成
2. 学習準備
FashionMNISTデータセットの準備
本データセットは Huggingfaceで提供する FashionMNISTデータセットを基準に作成されました。
データセットは以下のように事前に準備されているものと仮定しています。
- Data Manager
- Object Storage, Ncloud Storage
| データ管理位置 | 推奨事項 | 備考 |
|---|---|---|
| Data Manager |
|
|
| Object Storage, Ncloud Storage |
|
Data Managerで管理したい場合、 データセットアップロードをご参照ください。
学習データを使用する
学習データを使用する方法は、以下の通りです。
- Huggingface DataLoaderを利用したリモート読み取り方法
- 選択したストレージ内のデータセットをコピー後、ストレージ読み取り方法
Huggingface DataLoader基盤のリモート読み取り方法はデータセット読み取りを参照し、以下ではボリュームデータ方法についてのみ説明します。
PVC作成
選択したストレージを ML expert Platform内タスク環境で使用する場合、PersistentVolumeClaim(PVC)を作成してマウントする必要があり、詳細は Volumesをご参照ください。次は ML expert Platformでサポートするストレージ種類別 PVCの作成例です。
高性能ストレージ(DDN)は Read Write Many(RWM)をサポートするため、以下のように構成できます。
#exa-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: exa-pvc
namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
storageClassName: { 高性能ストレージ storageClassName } # 高性能ストレージ StorageClass名
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi # 作成する高性能ストレージ容量
kubectl -n {namespace} apply -f exa-pvc.yaml
ローカルストレージ(NVMe)は GPUサーバに Boundされているため、ノード数だけ PVC作成が必要です。ローカルストレージ(NVMe)を使用する場合はEmptyDirとinitContainerを利用した方法またはEmptyDirと Data Manager DataLoaderを利用した方法をお勧めします。
#local-path-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: local-path-pvc
namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
storageClassName: { ローカルストレージ storageClassName } # ローカルストレージ StorageClass名
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi # (デフォルト値) ローカルストレージの場合、動作しない
kubectl -n {namespace} apply -f local-path-pvc.yaml
データダウンロード
ML expert Platformではデータセットのダウンロードのために storage-initializer 画像を提供します。Kubernetes Jobを利用して作成された PVCをマウントして提供された画像を通じてデータセットをダウンロードします。
Jobを利用したダウンロード例は、次の通りです。
Data Managerの使用例
# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: download-job
namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
annotations:
sidecar.istio.io/inject: "false"
spec:
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
restartPolicy: Never
nodeSelector:
mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
containers:
- name: storage-initializer
image: mlx-public.kr.ncr.ntruss.com/mlx/mdm-storage-initializer:v0.0.1
env:
- name: MLX_APIKEY # (3)!
value: '{ API Key }' # MLXP API Key
args:
- "mlx+data-manager://{ MLX endpoint url }/{workspace}/{dataset}"
- "/data/dataset" # spec.volueMountsの mountPath内データセット保存のためのパス
volumeMounts:
- mountPath: "/data"
name: storage-volume
volumes:
- name: storage-volume
persistentVolumeClaim:
claimName: { マウントする PVC名 } # 作成した PVC名を入力 (e.g.exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml
Object Storage / Ncloud Storageの使用例
# download-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: download-job
namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
annotations:
sidecar.istio.io/inject: "false"
spec:
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
restartPolicy: Never
nodeSelector:
mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
containers:
- name: storage-initializer
image: mlx-public.kr.ncr.ntruss.com/mlx/kserve/storage-initializer:v0.13.0
env:
- name: AWS_ENDPOINT_URL
value: { S3 Endpoint } # S3 Endpoint
- name: AWS_ACCESS_KEY_ID
value: { S3 Access Key } # S3 Access Key
- name: AWS_SECRET_ACCESS_KEY
value: { S3 Secret Key } # S3 Secret Key
- name: AWS_DEFAULT_REGION
value: { S3 Region } # S3 Region
args:
- "s3://{ Object Storageデータセット URL }"
- "/data/dataset" # spec.volueMountsの mountPath内データセット保存のためのパス
volumeMounts:
- mountPath: "/data"
name: storage-volume
volumes:
- name: storage-volume
persistentVolumeClaim:
claimName: { マウントする PVC名 } # 作成した PVC名を入力 (e.g.exa-pvc, local-path-pvc)
kubectl -n {namespace} apply -f download-job.yaml
学習コードの準備
InfiniBandを使用するには PyTorchJobで使用する画像に libibverbs.so がインストールされている必要があります。必要なライブラリがすべてインストールされているため、ML expert Platformで共有するベースイメージをベースに必要な画像を作成することを勧めします。
ML expert Platformで提供する学習を使用するには、NVIDIA公式 Pytorchベースイメージをベースにしたコード作成が必要です。
以下のサンプルコードは、高性能ストレージ(DDN)の使用に基づいて作成されました。
サンプル学習コード
サンプル学習コードは次の通りです。
# mnist_distributed.py
from __future__ import print_function
import argparse
import os
import time
from torch.utils.tensorboard import SummaryWriter
from torchvision.transforms import transforms
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from mlx.sdk.data import login, load_dataset
WORLD_SIZE = int(os.environ.get("WORLD_SIZE"))
LOCAL_RANK = int(os.environ.get("LOCAL_RANK", 0))
GLOBAL_RANK = int(os.environ.get("RANK"))
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4 * 4 * 50, 500)
self.fc2 = nn.Linear(500, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
def train(args, model, device, train_loader, optimizer, epoch, writer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if GLOBAL_RANK == 0 and batch_idx % args.log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
epoch,
batch_idx * len(data),
len(train_loader.dataset) // WORLD_SIZE,
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
niter = epoch * len(train_loader) + batch_idx
writer.add_scalar("loss", loss.item(), niter)
def test(args, model, device, test_loader, writer, epoch):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(
output, target, reduction="sum"
).item() # sum up batch loss
pred = output.max(1, keepdim=True)[
1
] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
accuracy = float(correct) / (len(test_loader.dataset) / WORLD_SIZE)
if GLOBAL_RANK == 0:
print("\naccuracy={:.4f}\n".format(accuracy))
writer.add_scalar("accuracy", accuracy, epoch)
def main():
parser = argparse.ArgumentParser(description="PyTorch Distributed MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
metavar="N",
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--test-batch-size",
type=int,
default=1000,
metavar="N",
help="input batch size for testing (default: 1000)",
)
parser.add_argument(
"--epochs",
type=int,
default=5,
metavar="N",
help="number of epochs to train (default: 10)",
)
parser.add_argument(
"--lr",
type=float,
default=0.01,
metavar="LR",
help="learning rate (default: 0.01)",
)
parser.add_argument(
"--momentum",
type=float,
default=0.5,
metavar="M",
help="SGD momentum (default: 0.5)",
)
parser.add_argument(
"--seed", type=int, default=1, metavar="S", help="random seed (default: 1)"
)
parser.add_argument(
"--log-interval",
type=int,
default=10,
metavar="N",
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--checkpoint_path",
default=f"/data/result/mnist_distributed_{int(time.time())}.pt",
help="Path to save checkpoint",
)
parser.add_argument(
"--data_path", default="/data/mnist/data", help="Path for training/test data"
)
parser.add_argument(
"--log_path",
default="/data/log",
metavar="L",
help="Directory pathwhere summary logs are stored",
)
parser.add_argument(
"--backend",
type=str,
help="Distributed backend",
choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI],
default=dist.Backend.NCCL,
)
args = parser.parse_args()
login("{ ML expert Platform API Key }")
writer = SummaryWriter(args.log_path)
torch.manual_seed(args.seed)
print("Using distributed PyTorch with {} backend".format(args.backend))
dist.init_process_group(backend=args.backend)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
def preprocess(examples):
"""HuggingFace datasets用の正しい transform"""
if isinstance(examples['image'], list):
# バッチ処理
examples['image'] = [transform(img) for img in examples['image']]
examples['label'] = torch.tensor(examples['label'])
else:
# 単一アイテム
examples['image'] = transform(examples['image'])
examples['label'] = torch.tensor(examples['label'])
return examples['image'], examples['label']
train_dataset = load_dataset(args.data_path, split="train")
train_dataset.set_transform(preprocess)
test_dataset = load_dataset(args.data_path, split="test")
test_dataset.set_transform(preprocess)
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size,
shuffle=False,
num_workers=1,
pin_memory=True,
sampler=DistributedSampler(train_dataset),
)
test_loader = torch.utils.data.DataLoader(
test_dataset,
batch_size=args.test_batch_size,
shuffle=False,
num_workers=1,
pin_memory=True,
sampler=DistributedSampler(test_dataset),
)
model = Net().to(LOCAL_RANK)
# Wrap the model with DistributedDataParallel if needed.
model = DDP(model, device_ids=[LOCAL_RANK])
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train(args, model, LOCAL_RANK, train_loader, optimizer, epoch, writer)
test(args, model, LOCAL_RANK, test_loader, writer, epoch)
if GLOBAL_RANK == 0:
dir_name = os.path.dirname(args.checkpoint_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
torch.save(model.state_dict(), args.checkpoint_path)
print(f"Checkpoint saved at {args.checkpoint_path}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
サンプル Dockerfile
FROM nvcr.io/nvidia/pytorch:23.03-py3
# tensorboardXのインストール
USER root
RUN pip install --no-cache-dir tensorboardX==2.6.2
RUN mkdir -p /opt/mnist/src
WORKDIR /opt/mnist/src
USER 500:500 # 高性能ストレージを使用するには UID 500、GID 500権限が必要
COPY mnist_distributed.py /opt/mnist/src/mnist_distributed.py
コンテナレジストリへのアクセス情報の設定
コンテナレジストリで ML expert Platformでイメージ Pullをするには、アクセス情報の設定が必要です。
3. 学習開始
Pytorchで公式的に提供する Elastic Launchの使用を推奨し、以下のユースケースは torchrunをベースに高性能ストレージ(DDN)を利用したユースケースです。
PytorchJob作成
分散学習のために、以下のように行います。
# pytorchjob.yaml
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-mnist-dist-nccl
namespace: p-{ projectName } # プロジェクトに該当する Kubernetes Namespace名
spec:
elasticPolicy:
rdzvId: mnist
rdzvBackend: c10d
minReplicas: 2
maxReplicas: 2
nProcPerNode: 8
runPolicy:
cleanPodPolicy: None
pytorchReplicaSpecs:
Worker:
replicas: 2
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false" # Istio sidecar injectionの無効化は必須
spec:
nodeSelector:
mlx.navercorp.com/zone: { 提供された GPU Zone名 } # GPU Resourcesで確認できる Zone名
containers:
- name: pytorch # PyTorchJobの container名を必ず pytorchに設定
image: examples.com/pytorch-mnist-dist:23.03-py3 # サンプルコードのコンテナイメージ
imagePullPolicy: Always
securityContext: # Infinibandの使用のために securityContextが必要です。
capabilities:
add: ["IPC_LOCK"]
command: ["bash", "-c"]
args:
- >
torchrun --nnodes ${PET_NNODES} --nproc_per_node ${PET_NPROC_PER_NODE} --rdzv_id ${PET_RDZV_ID} --rdzv_backend ${PET_RDZV_BACKEND} --rdzv_endpoint ${PET_RDZV_ENDPOINT}
/opt/mnist/src/mnist.py --checkpoint_path /data/checkpoints/mnist.pt --log_path /data/logs --data_path /data/dataset
env:
- name: NCCL_DEBUG
value: INFO
resources:
limits:
memory: "1Ti"
cpu: 120
nvidia.com/gpu: 8
rdma/hca_shared_devices_a: 1
requests:
memory: "8Gi"
cpu: 120
nvidia.com/gpu: 8
rdma/hca_shared_devices_a: 1
# shared memory
volumeMounts:
- mountPath: /dev/shm
name: shared-memory
- mountPath: "/data"
name: storage-volume
volumes:
- emptyDir:
medium: Memory
name: shared-memory
- name: storage-volume
persistentVolumeClaim:
claimName: exa-pvc # 以前に作成した高性能ストレージ PVC名
PytorchJob実行
kubectl -n { namespace } apply -f pytorchjob.yaml
4. 状況と結果確認
学習状況と結果を確認する方法は、次の通りです。
Pod Logベースの確認
学習中の Podは kubectl logs コマンドを通じてログを確認できます。
kubectl -n { namespace } logs pytorch-elastic-mnist-nccl-worker-0 pytorch
Tensorboardベースの確認
学習コードで Tensorboardのためのロギングを残していたら ML expert Platformで提供する Tensorboardを通じて情報を確認できます。
サンプルコードの場合、Tensorboardログを/data/logsに保存しています。
学習結果の保存
学習パラメータを保管管理するには、Model Registryを使用できます。
学習パラメータを保存するには、Model Registry SDKを通じて自動的にアップロードするか、Notebookを通じて必要な学習パラメータのみをアップロードして管理できます。