Ncloud TensorFlow cluster MNIST example
    • PDF

    Ncloud TensorFlow cluster MNIST example

    • PDF

    Article Summary

    Available in Classic

    Ncloud TensorFlow Cluster MNIST example

    Edit distributed parallel processing code

    This section describes how to edit the example code to submit jobs in the NAVER Cloud Platform Ncloud TensorFlow Cluster.

    The example code shown below is /home/ncp/workspace/DistributedTensorFlow.py, which is provided by default for the TensorFlow master node.

    You should edit or add the following two code blocks:

    • The block that parses the cluster information saved in flags and passes it to ClusterSpec.
    • The block that receives an argument in argparse passed by the cluster when main() is called.
    from __future__ import absolute_import
    from __future__ import division
    from __future__ import print_function
    
    import math
    import sys
    import tempfile
    import time
    import argparse
    
    import tensorflow as tf
    from tensorflow.examples.tutorials.mnist import input_data
    
    flags = tf.app.flags
    flags.DEFINE_string("data_dir", "/home/ncp/mnist-data",
                        "Directory for storing mnist data")
    flags.DEFINE_boolean("download_only", False,
                         "Only perform downloading of data; Do not proceed to "
                         "session preparation, model definition or training")
    flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
                                        "If you don't use GPU, please set it to '0'")
    flags.DEFINE_integer("replicas_to_aggregate", None,
                         "Number of replicas to aggregate before parameter update"
                         "is applied (For sync_replicas mode only; default: "
                         "num_workers)")
    flags.DEFINE_integer("hidden_units", 1000,
                         "Number of units in the hidden layer of the NN")
    flags.DEFINE_integer("train_steps", 3000,
                         "Number of (global) training steps to perform")
    flags.DEFINE_integer("batch_size", 100, "Training batch size")
    flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
    flags.DEFINE_boolean(
        "sync_replicas", True,
        "Use the sync_replicas (synchronized replicas) mode, "
        "wherein the parameter updates from workers are aggregated "
        "before applied to avoid stale gradients")
    flags.DEFINE_boolean(
        "existing_servers", False, "Whether servers already exists. If True, "
                                   "will use the worker hosts via their GRPC URLs (one client process "
                                   "per worker host). Otherwise, will create an in-process TensorFlow "
                                   "server.")
    
    FLAGS = flags.FLAGS
    NCP_FLAGS = None
    
    IMAGE_PIXELS = 28
    
    
    def main(unused_argv):
        if NCP_FLAGS.job_name == "worker":
            mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
    
        if FLAGS.download_only:
            sys.exit(0)
    

    The following code block should be added to your code. Parse the cluster spec argument passed from the cluster and pass it in ClusterSpec. How to use the TF_CONFIG system variable is described below.

        if NCP_FLAGS.job_name is None or NCP_FLAGS.job_name == "":
            raise ValueError("Must specify an explicit `job_name`")
        if NCP_FLAGS.task_index is None or NCP_FLAGS.task_index == "":
            raise ValueError("Must specify an explicit `task_index`")
    
        # Construct the cluster and start the server
        ps_spec = NCP_FLAGS.ps_hosts.split(",")
        worker_spec = NCP_FLAGS.worker_hosts.split(",")
    
        # Get the number of workers.
        num_workers = len(worker_spec)
    
        cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
    

    The following code block does not need to be edited.

        if not FLAGS.existing_servers:
            # Not using existing servers. Create an in-process server.
    
            server = tf.train.Server(cluster, job_name=NCP_FLAGS.job_name, task_index=NCP_FLAGS.task_index)
            if NCP_FLAGS.job_name == "ps":
                server.join()
    
        is_chief = (NCP_FLAGS.task_index == 0)
        if FLAGS.num_gpus > 0:
            # Avoid gpu allocation conflict: now allocate task_num -> #gpu
            # for each worker in the corresponding machine
            gpu = (NCP_FLAGS.task_index % FLAGS.num_gpus)
            worker_device = "/job:worker/task:%d/gpu:%d" % (NCP_FLAGS.task_index, gpu)
        elif FLAGS.num_gpus == 0:
            # Just allocate the CPU to worker server
            cpu = 0
            worker_device = "/job:worker/task:%d/cpu:%d" % (NCP_FLAGS.task_index, cpu)
        # The device setter will automatically place Variables ops on separate
        # parameter servers (ps). The non-Variable ops will be placed on the workers.
        # The ps use CPU and workers use corresponding GPU
        with tf.device(
                tf.train.replica_device_setter(
                    worker_device=worker_device,
                    ps_device="/job:ps/cpu:0",
                    cluster=cluster)):
            global_step = tf.Variable(0, name="global_step", trainable=False)
    
            # Variables of the hidden layer
            hid_w = tf.Variable(
                tf.truncated_normal(
                    [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                    stddev=1.0 / IMAGE_PIXELS),
                name="hid_w")
            hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
    
            # Variables of the softmax layer
            sm_w = tf.Variable(
                tf.truncated_normal(
                    [FLAGS.hidden_units, 10],
                    stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
                name="sm_w")
            sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
    
            # Ops: located on the worker specified with NCP_FLAGS.task_index
            x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
            y_ = tf.placeholder(tf.float32, [None, 10])
    
            hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
            hid = tf.nn.relu(hid_lin)
    
            y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
            cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    
            opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
    
            if FLAGS.sync_replicas:
                if FLAGS.replicas_to_aggregate is None:
                    replicas_to_aggregate = num_workers
                else:
                    replicas_to_aggregate = FLAGS.replicas_to_aggregate
    
                opt = tf.train.SyncReplicasOptimizer(
                    opt,
                    replicas_to_aggregate=replicas_to_aggregate,
                    total_num_replicas=num_workers,
                    name="mnist_sync_replicas")
    
            train_step = opt.minimize(cross_entropy, global_step=global_step)
    
            if FLAGS.sync_replicas:
                local_init_op = opt.local_step_init_op
                if is_chief:
                    local_init_op = opt.chief_init_op
    
                ready_for_local_init_op = opt.ready_for_local_init_op
    
                # Initial token and chief queue runners required by the sync_replicas mode
                chief_queue_runner = opt.get_chief_queue_runner()
                sync_init_op = opt.get_init_tokens_op()
    
            init_op = tf.global_variables_initializer()
            train_dir = tempfile.mkdtemp()
    
            if FLAGS.sync_replicas:
                sv = tf.train.Supervisor(
                    is_chief=is_chief,
                    logdir=train_dir,
                    init_op=init_op,
                    local_init_op=local_init_op,
                    ready_for_local_init_op=ready_for_local_init_op,
                    recovery_wait_secs=1,
                    global_step=global_step)
            else:
                sv = tf.train.Supervisor(
                    is_chief=is_chief,
                    logdir=train_dir,
                    init_op=init_op,
                    recovery_wait_secs=1,
                    global_step=global_step)
    
            sess_config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False,
                device_filters=["/job:ps",
                                "/job:worker/task:%d" % NCP_FLAGS.task_index])
    
            # The chief worker (task_index==0) session will prepare the session,
            # while the remaining workers will wait for the preparation to complete.
            if is_chief:
                print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
            else:
                print("Worker %d: Waiting for session to be initialized..." %
                      NCP_FLAGS.task_index)
    
            if FLAGS.existing_servers:
                server_grpc_url = "grpc://" + worker_spec[NCP_FLAGS.task_index]
                print("Using existing server at: %s" % server_grpc_url)
    
                sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
            else:
                sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
    
            print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
    
            if FLAGS.sync_replicas and is_chief:
                # Chief worker will start the chief queue runner and call the init op.
                sess.run(sync_init_op)
                sv.start_queue_runners(sess, [chief_queue_runner])
    
            # Perform training
            time_begin = time.time()
            print("Training begins @ %f" % time_begin)
    
            local_step = 0
            while True:
                # Training feed
                batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                train_feed = {x: batch_xs, y_: batch_ys}
    
                _, step = sess.run([train_step, global_step], feed_dict=train_feed)
                local_step += 1
    
                now = time.time()
                print("%f: Worker %d: training step %d done (global step: %d)" %
                      (now, NCP_FLAGS.task_index, local_step, step))
    
                if step >= FLAGS.train_steps:
                    break
    
            time_end = time.time()
            print("Training ends @ %f" % time_end)
            training_time = time_end - time_begin
            print("Training elapsed time: %f s" % training_time)
    
            # Validation feed
            val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
            val_xent = sess.run(cross_entropy, feed_dict=val_feed)
            print("After %d training step(s), validation cross entropy = %g" %
                  (FLAGS.train_steps, val_xent))
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
    

    The following code block should be added to your code. Use argparse to automatically receive the cluster’s configuration information passed from the cluster.

    • For --worker_hosts and --ps_hosts, the same value is passed to all node servers.
    • For --job_name and --task_index, the system automatically provides information appropriate for each node. (Note that if you execute “tcm submit,” you can specify ps_num to increase the number of parameter servers, and in this case, the specified parameter is passed.)
    • --worker_hosts, --ps_hosts, --job_name, and --task_index are pre-defined arguments and thus should be used as they are.
    • You can use unparsed to receive a user-defined argument.
      ## get cluster info by NCP
      parser.add_argument("--worker_hosts", type=str)
      parser.add_argument("--ps_hosts", type=str)
      parser.add_argument("--job_name", type=str)
      parser.add_argument("--task_index", type=int)
    
    • Use parser.parse_knwon_args(), instead of parser.parse_args(), to use the unparsed argument.
      NCP_FLAGS, unparsed = parser.parse_known_args()
    
      tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
    

    How to use TF_CONFIG

    When you use RunConfig to parse the cluster spec information, use the following code, rather than using a split (",").

      tf_config_dic = {"cluster": {"ps": ps_str.split(","),
                                   "worker": worker_str.split(","),
                                 # "master": master_str.split(",")
                                   },
                          "task": {"type": task_type,
                                   "index": task_index}}
    
      os.environ["TF_CONFIG"] = json.dumps(tf_config_dic)
    
    • The master node is a Special Work Node that is usually used to manage checksums and logs of the model.
    • If needed, select a worker server from the worker server list and use it after changing its port number to another one (in this case, the master node’s task_index is usually 0).
    • Use os.environ.get() to read TF_CONFIG manually.

    How to import external modules

    “tcm submit” is designed to distribute only a single learning program to each node. If you want to import external modules, copy the code into your public NAS storage, /mnt/nas, and use it as follows.

        import sys
    
        sys.path.insert(0, [Module’s PATH])
        import [Module name]
    

    How to manually submit jobs without using “tcm submit”

    When it is hard to use the tcm submit command for structural reasons of your package, you can execute the following command in the cluster master Console.

    • The access port for server nodes is 22 by default.
    • program_path is the path of your code. It should be located under /mnt/nas which all server nodes can access.
    • arg_string is an example of a custom argument that should be passed to your program.
    • /home/ncp/ncp.log is an example of the log path. Create a folder for logs under /mnt/nas.
            _cmd = "ssh -p " + port + " root@" + target_ip + \
                   " " + "/home/ncp/anaconda2/envs/ncp/bin/python " \
                   + program_path + " " + arg_string + \
                   " ' >> /mnt/nas/log/ncp.log 2>&1 &"
            
            os.system(_cmd)      
    

    Was this article helpful?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.