Ncloud TensorFlow Cluster MNIST 예제
- 인쇄
- PDF
Ncloud TensorFlow Cluster MNIST 예제
- 인쇄
- PDF
기사 요약
이 요약이 도움이 되었나요?
의견을 보내 주셔서 감사합니다.
Classic 환경에서 이용 가능합니다.
Ncloud TensorFlow Cluster MNIST 예제
분산병렬 코드 수정하기
NCP Ncloud TensorFlow Cluster에서 Job Submit(제출)을 하기 위한 코드 수정 방법을 설명합니다.
다음 예제는 /home/ncp/workspace/DistributedTensorFlow.py로, TensorFlow master 노드에 기본 제공됩니다.
수정하거나 추가해야 하는 코드 블록은 아래 두 가지입니다.
- flag에 저장된 Cluster 정보를 파싱하여 ClusterSpec에 패싱하는 블록
- main() 함수를 호출할 때 argparse로 Cluster에서 넘겨주는 인자를 받는 블록
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import sys
import tempfile
import time
import argparse
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
flags = tf.app.flags
flags.DEFINE_string("data_dir", "/home/ncp/mnist-data",
"Directory for storing mnist data")
flags.DEFINE_boolean("download_only", False,
"Only perform downloading of data; Do not proceed to "
"session preparation, model definition or training")
flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_integer("hidden_units", 1000,
"Number of units in the hidden layer of the NN")
flags.DEFINE_integer("train_steps", 3000,
"Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
flags.DEFINE_boolean(
"sync_replicas", True,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
FLAGS = flags.FLAGS
NCP_FLAGS = None
IMAGE_PIXELS = 28
def main(unused_argv):
if NCP_FLAGS.job_name == "worker":
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
if FLAGS.download_only:
sys.exit(0)
다음은 사용자 코드에 별도의 추가가 필요한 코드 블록입니다. Cluster에서 사용자 코드에 넘겨주는 Cluster 스펙 인자 값을 파싱해서 ClusterSpec에 넘겨줍니다. TF_CONFIG 시스템 변수 활용법은 아래에서 별도로 설명합니다.
if NCP_FLAGS.job_name is None or NCP_FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if NCP_FLAGS.task_index is None or NCP_FLAGS.task_index == "":
raise ValueError("Must specify an explicit `task_index`")
# Construct the cluster and start the server
ps_spec = NCP_FLAGS.ps_hosts.split(",")
worker_spec = NCP_FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
아래 코드 블록은 별도의 수정이 필요 없습니다.
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(cluster, job_name=NCP_FLAGS.job_name, task_index=NCP_FLAGS.task_index)
if NCP_FLAGS.job_name == "ps":
server.join()
is_chief = (NCP_FLAGS.task_index == 0)
if FLAGS.num_gpus > 0:
# Avoid gpu allocation conflict: now allocate task_num -> #gpu
# for each worker in the corresponding machine
gpu = (NCP_FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (NCP_FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
# Just allocate the CPU to worker server
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (NCP_FLAGS.task_index, cpu)
# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
global_step = tf.Variable(0, name="global_step", trainable=False)
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal(
[IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0 / IMAGE_PIXELS),
name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal(
[FLAGS.hidden_units, 10],
stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
# Ops: located on the worker specified with NCP_FLAGS.task_index
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="mnist_sync_replicas")
train_step = opt.minimize(cross_entropy, global_step=global_step)
if FLAGS.sync_replicas:
local_init_op = opt.local_step_init_op
if is_chief:
local_init_op = opt.chief_init_op
ready_for_local_init_op = opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = opt.get_chief_queue_runner()
sync_init_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
train_dir = tempfile.mkdtemp()
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps",
"/job:worker/task:%d" % NCP_FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
NCP_FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[NCP_FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Session initialization complete." % NCP_FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
while True:
# Training feed
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_step, global_step], feed_dict=train_feed)
local_step += 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, NCP_FLAGS.task_index, local_step, step))
if step >= FLAGS.train_steps:
break
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
# Validation feed
val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
val_xent = sess.run(cross_entropy, feed_dict=val_feed)
print("After %d training step(s), validation cross entropy = %g" %
(FLAGS.train_steps, val_xent))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
다음은 사용자 코드에 별도의 추가가 필요한 코드 블록입니다. Cluster에서 넘겨주는 Cluster의 구성 정보를 argparse로 자동으로 받습니다.
- --worker_hosts와 --ps_hosts는 모든 노드 서버에 동일한 값이 전달됩니다.
- --job_name과 --task_index는 시스템에서 자동으로 해당 노드에 맞는 정보를 제공합니다. (단, 'tcm submit' 실행 시 ps_num 값을 지정하여 파라미터 서버의 수를 증가시킬 수 있으며, 이 경우 적용된 파라미터가 전달됩니다.)
- --worker_hosts, --ps_hosts, --job_name, --task_index는 미리 정의된 예약 인자 값이므로 그대로 사용해야 합니다.
- unparsed라는 인자 값을 통해서 사용자 정의 인자 값을 받을 수 있습니다.
## get cluster info by NCP
parser.add_argument("--worker_hosts", type=str)
parser.add_argument("--ps_hosts", type=str)
parser.add_argument("--job_name", type=str)
parser.add_argument("--task_index", type=int)
- unparsed 인자 값을 사용하려면 parser.parse_args() 대신에 parser.parse_knwon_args()를 사용합니다.
NCP_FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
TF_CONFIG 시스템 변수의 활용
Cluster Spec 정보를 RunConfig로 파싱하는 경우에는 split(",")를 이용하여 파싱하는 대신 아래의 코드를 활용해 주십시오.
tf_config_dic = {"cluster": {"ps": ps_str.split(","),
"worker": worker_str.split(","),
# "master": master_str.split(",")
},
"task": {"type": task_type,
"index": task_index}}
os.environ["TF_CONFIG"] = json.dumps(tf_config_dic)
- 마스터 노드는 Special Work Node로서 보통 모델의 Checksum과 로그를 관리하는 용도로 사용됩니다.
- 필요한 경우 워커 서버리스트 중 한대를 지정하여 포트 번호를 다른 번호로 변경 후 사용해 주십시오. (이 경우 마스터 노드의 task_index는 통상 0입니다.)
- 수동으로 TF_CONFIG를 읽는 경우 os.environ.get()을 사용해 주십시오.
외부 모듈 임포트 방법
tcm submit
은 단일 학습 프로그램만 각 노드에 배포하도록 구성되어 있습니다. 외부 모듈을 임포트해야 하는 경우 공용 NAS 스토리지 /mnt/nas에 코드를 복사한 뒤 아래와 같이 사용해 주십시오.
import sys
sys.path.insert(0, [모듈의 PATH])
import [모듈명]
tcm submit을 사용하지 않고 수동으로 실행하는 방법
사용자 패키지의 구조상 tcm submit
명령어를 사용하기 어려운 경우 Cluster master 콘솔에서 다음과 같이 명령을 실행할 수 있습니다.
- 서버 노드 접근 포트는 기본적으로 22번입니다.
program_path
는 사용자 코드의 경로입니다. 모든 서버 노드에서 접근이 가능해야 하기 때문에 /mnt/nas 하위에 위치시켜 주십시오.arg_string
은 사용자 프로그램에 넘겨야 할 커스텀 인자 값이 있는 경우의 예입니다./home/ncp/ncp.log
는 로그 경로의 예입니다. /mnt/nas 하위에 log 폴더를 만들어서 사용해 주십시오.
_cmd = "ssh -p " + port + " root@" + target_ip + \
" " + "/home/ncp/anaconda2/envs/ncp/bin/python " \
+ program_path + " " + arg_string + \
" ' >> /mnt/nas/log/ncp.log 2>&1 &"
os.system(_cmd)
이 문서가 도움이 되었습니까?