- Print
- PDF
Ncloud TensorFlow cluster MNIST example
- Print
- PDF
Available in Classic
Ncloud TensorFlow Cluster MNIST example
Edit distributed parallel processing code
This section describes how to edit the example code to submit jobs in the NAVER Cloud Platform Ncloud TensorFlow Cluster.
The example code shown below is /home/ncp/workspace/DistributedTensorFlow.py, which is provided by default for the TensorFlow master node.
You should edit or add the following two code blocks:
- The block that parses the cluster information saved in flags and passes it to ClusterSpec.
- The block that receives an argument in argparse passed by the cluster when main() is called.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import sys
import tempfile
import time
import argparse
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
flags = tf.app.flags
flags.DEFINE_string("data_dir", "/home/ncp/mnist-data",
"Directory for storing mnist data")
flags.DEFINE_boolean("download_only", False,
"Only perform downloading of data; Do not proceed to "
"session preparation, model definition or training")
flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
"If you don't use GPU, please set it to '0'")
flags.DEFINE_integer("replicas_to_aggregate", None,
"Number of replicas to aggregate before parameter update"
"is applied (For sync_replicas mode only; default: "
"num_workers)")
flags.DEFINE_integer("hidden_units", 1000,
"Number of units in the hidden layer of the NN")
flags.DEFINE_integer("train_steps", 3000,
"Number of (global) training steps to perform")
flags.DEFINE_integer("batch_size", 100, "Training batch size")
flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
flags.DEFINE_boolean(
"sync_replicas", True,
"Use the sync_replicas (synchronized replicas) mode, "
"wherein the parameter updates from workers are aggregated "
"before applied to avoid stale gradients")
flags.DEFINE_boolean(
"existing_servers", False, "Whether servers already exists. If True, "
"will use the worker hosts via their GRPC URLs (one client process "
"per worker host). Otherwise, will create an in-process TensorFlow "
"server.")
FLAGS = flags.FLAGS
NCP_FLAGS = None
IMAGE_PIXELS = 28
def main(unused_argv):
if NCP_FLAGS.job_name == "worker":
mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
if FLAGS.download_only:
sys.exit(0)
The following code block should be added to your code. Parse the cluster spec argument passed from the cluster and pass it in ClusterSpec. How to use the TF_CONFIG system variable is described below.
if NCP_FLAGS.job_name is None or NCP_FLAGS.job_name == "":
raise ValueError("Must specify an explicit `job_name`")
if NCP_FLAGS.task_index is None or NCP_FLAGS.task_index == "":
raise ValueError("Must specify an explicit `task_index`")
# Construct the cluster and start the server
ps_spec = NCP_FLAGS.ps_hosts.split(",")
worker_spec = NCP_FLAGS.worker_hosts.split(",")
# Get the number of workers.
num_workers = len(worker_spec)
cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
The following code block does not need to be edited.
if not FLAGS.existing_servers:
# Not using existing servers. Create an in-process server.
server = tf.train.Server(cluster, job_name=NCP_FLAGS.job_name, task_index=NCP_FLAGS.task_index)
if NCP_FLAGS.job_name == "ps":
server.join()
is_chief = (NCP_FLAGS.task_index == 0)
if FLAGS.num_gpus > 0:
# Avoid gpu allocation conflict: now allocate task_num -> #gpu
# for each worker in the corresponding machine
gpu = (NCP_FLAGS.task_index % FLAGS.num_gpus)
worker_device = "/job:worker/task:%d/gpu:%d" % (NCP_FLAGS.task_index, gpu)
elif FLAGS.num_gpus == 0:
# Just allocate the CPU to worker server
cpu = 0
worker_device = "/job:worker/task:%d/cpu:%d" % (NCP_FLAGS.task_index, cpu)
# The device setter will automatically place Variables ops on separate
# parameter servers (ps). The non-Variable ops will be placed on the workers.
# The ps use CPU and workers use corresponding GPU
with tf.device(
tf.train.replica_device_setter(
worker_device=worker_device,
ps_device="/job:ps/cpu:0",
cluster=cluster)):
global_step = tf.Variable(0, name="global_step", trainable=False)
# Variables of the hidden layer
hid_w = tf.Variable(
tf.truncated_normal(
[IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
stddev=1.0 / IMAGE_PIXELS),
name="hid_w")
hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
# Variables of the softmax layer
sm_w = tf.Variable(
tf.truncated_normal(
[FLAGS.hidden_units, 10],
stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
name="sm_w")
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
# Ops: located on the worker specified with NCP_FLAGS.task_index
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
y_ = tf.placeholder(tf.float32, [None, 10])
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
hid = tf.nn.relu(hid_lin)
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
if FLAGS.sync_replicas:
if FLAGS.replicas_to_aggregate is None:
replicas_to_aggregate = num_workers
else:
replicas_to_aggregate = FLAGS.replicas_to_aggregate
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=replicas_to_aggregate,
total_num_replicas=num_workers,
name="mnist_sync_replicas")
train_step = opt.minimize(cross_entropy, global_step=global_step)
if FLAGS.sync_replicas:
local_init_op = opt.local_step_init_op
if is_chief:
local_init_op = opt.chief_init_op
ready_for_local_init_op = opt.ready_for_local_init_op
# Initial token and chief queue runners required by the sync_replicas mode
chief_queue_runner = opt.get_chief_queue_runner()
sync_init_op = opt.get_init_tokens_op()
init_op = tf.global_variables_initializer()
train_dir = tempfile.mkdtemp()
if FLAGS.sync_replicas:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
local_init_op=local_init_op,
ready_for_local_init_op=ready_for_local_init_op,
recovery_wait_secs=1,
global_step=global_step)
else:
sv = tf.train.Supervisor(
is_chief=is_chief,
logdir=train_dir,
init_op=init_op,
recovery_wait_secs=1,
global_step=global_step)
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=False,
device_filters=["/job:ps",
"/job:worker/task:%d" % NCP_FLAGS.task_index])
# The chief worker (task_index==0) session will prepare the session,
# while the remaining workers will wait for the preparation to complete.
if is_chief:
print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
else:
print("Worker %d: Waiting for session to be initialized..." %
NCP_FLAGS.task_index)
if FLAGS.existing_servers:
server_grpc_url = "grpc://" + worker_spec[NCP_FLAGS.task_index]
print("Using existing server at: %s" % server_grpc_url)
sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
else:
sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
# Chief worker will start the chief queue runner and call the init op.
sess.run(sync_init_op)
sv.start_queue_runners(sess, [chief_queue_runner])
# Perform training
time_begin = time.time()
print("Training begins @ %f" % time_begin)
local_step = 0
while True:
# Training feed
batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
train_feed = {x: batch_xs, y_: batch_ys}
_, step = sess.run([train_step, global_step], feed_dict=train_feed)
local_step += 1
now = time.time()
print("%f: Worker %d: training step %d done (global step: %d)" %
(now, NCP_FLAGS.task_index, local_step, step))
if step >= FLAGS.train_steps:
break
time_end = time.time()
print("Training ends @ %f" % time_end)
training_time = time_end - time_begin
print("Training elapsed time: %f s" % training_time)
# Validation feed
val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
val_xent = sess.run(cross_entropy, feed_dict=val_feed)
print("After %d training step(s), validation cross entropy = %g" %
(FLAGS.train_steps, val_xent))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
The following code block should be added to your code. Use argparse to automatically receive the cluster’s configuration information passed from the cluster.
- For --worker_hosts and --ps_hosts, the same value is passed to all node servers.
- For --job_name and --task_index, the system automatically provides information appropriate for each node. (Note that if you execute “tcm submit,” you can specify ps_num to increase the number of parameter servers, and in this case, the specified parameter is passed.)
- --worker_hosts, --ps_hosts, --job_name, and --task_index are pre-defined arguments and thus should be used as they are.
- You can use unparsed to receive a user-defined argument.
## get cluster info by NCP
parser.add_argument("--worker_hosts", type=str)
parser.add_argument("--ps_hosts", type=str)
parser.add_argument("--job_name", type=str)
parser.add_argument("--task_index", type=int)
- Use parser.parse_knwon_args(), instead of parser.parse_args(), to use the unparsed argument.
NCP_FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
How to use TF_CONFIG
When you use RunConfig to parse the cluster spec information, use the following code, rather than using a split (",").
tf_config_dic = {"cluster": {"ps": ps_str.split(","),
"worker": worker_str.split(","),
# "master": master_str.split(",")
},
"task": {"type": task_type,
"index": task_index}}
os.environ["TF_CONFIG"] = json.dumps(tf_config_dic)
- The master node is a Special Work Node that is usually used to manage checksums and logs of the model.
- If needed, select a worker server from the worker server list and use it after changing its port number to another one (in this case, the master node’s task_index is usually 0).
- Use os.environ.get() to read TF_CONFIG manually.
How to import external modules
“tcm submit” is designed to distribute only a single learning program to each node. If you want to import external modules, copy the code into your public NAS storage, /mnt/nas, and use it as follows.
import sys
sys.path.insert(0, [Module’s PATH])
import [Module name]
How to manually submit jobs without using “tcm submit”
When it is hard to use the tcm submit
command for structural reasons of your package, you can execute the following command in the cluster master Console.
- The access port for server nodes is 22 by default.
program_path
is the path of your code. It should be located under /mnt/nas which all server nodes can access.arg_string
is an example of a custom argument that should be passed to your program./home/ncp/ncp.log
is an example of the log path. Create a folder for logs under /mnt/nas.
_cmd = "ssh -p " + port + " root@" + target_ip + \
" " + "/home/ncp/anaconda2/envs/ncp/bin/python " \
+ program_path + " " + arg_string + \
" ' >> /mnt/nas/log/ncp.log 2>&1 &"
os.system(_cmd)