Ncloud TensorFlow Cluster MNISTの例題
    • PDF

    Ncloud TensorFlow Cluster MNISTの例題

    • PDF

    Article Summary

    Classic環境で利用できます。

    Ncloud TensorFlow Cluster MNIST の例題

    分散並列コードを修正する

    NCP Ncloud TensorFlow Clusterで Job Submit(提出)をするためのコード修正方法をご説明します。

    次の例題は /home/ncp/workspace/DistributedTensorFlow.pyで、TensorFlow master ノードに基本提供されます。

    修正したり追加する必要のあるコードブロックは以下の二つです。

    • flagに保存されたCluster情報をパースしてClusterSpecにパッシングするブロック
    • main() 関数を呼び出す際 argparseでClusterに渡す因子を受け取るブロック
    from __future__ import absolute_import
    from __future__ import division
    from __future__ import print_function
    
    import math
    import sys
    import tempfile
    import time
    import argparse
    
    import tensorflow as tf
    from tensorflow.examples.tutorials.mnist import input_data
    
    flags = tf.app.flags
    flags.DEFINE_string("data_dir", "/home/ncp/mnist-data",
                        "Directory for storing mnist data")
    flags.DEFINE_boolean("download_only", False,
                         "Only perform downloading of data; Do not proceed to "
                         "session preparation, model definition or training")
    flags.DEFINE_integer("num_gpus", 0, "Total number of gpus for each machine."
                                        "If you don't use GPU, please set it to '0'")
    flags.DEFINE_integer("replicas_to_aggregate", None,
                         "Number of replicas to aggregate before parameter update"
                         "is applied (For sync_replicas mode only; default: "
                         "num_workers)")
    flags.DEFINE_integer("hidden_units", 1000,
                         "Number of units in the hidden layer of the NN")
    flags.DEFINE_integer("train_steps", 3000,
                         "Number of (global) training steps to perform")
    flags.DEFINE_integer("batch_size", 100, "Training batch size")
    flags.DEFINE_float("learning_rate", 0.01, "Learning rate")
    flags.DEFINE_boolean(
        "sync_replicas", True,
        "Use the sync_replicas (synchronized replicas) mode, "
        "wherein the parameter updates from workers are aggregated "
        "before applied to avoid stale gradients")
    flags.DEFINE_boolean(
        "existing_servers", False, "Whether servers already exists. If True, "
                                   "will use the worker hosts via their GRPC URLs (one client process "
                                   "per worker host). Otherwise, will create an in-process TensorFlow "
                                   "server.")
    
    FLAGS = flags.FLAGS
    NCP_FLAGS = None
    
    IMAGE_PIXELS = 28
    
    
    def main(unused_argv):
        if NCP_FLAGS.job_name == "worker":
            mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)
    
        if FLAGS.download_only:
            sys.exit(0)
    

    次はユーザーコードに別途追加が必要なコードブロックです。 Clusterからユーザーコードを引き渡す Clusterスペック因子値をパッシングしてClusterSpecに引き渡します。TF_CONFIG システム変数の活用法は下で別にご説明します。

        if NCP_FLAGS.job_name is None or NCP_FLAGS.job_name == "":
            raise ValueError("Must specify an explicit `job_name`")
        if NCP_FLAGS.task_index is None or NCP_FLAGS.task_index == "":
            raise ValueError("Must specify an explicit `task_index`")
    
        # Construct the cluster and start the server
        ps_spec = NCP_FLAGS.ps_hosts.split(",")
        worker_spec = NCP_FLAGS.worker_hosts.split(",")
    
        # Get the number of workers.
        num_workers = len(worker_spec)
    
        cluster = tf.train.ClusterSpec({"ps": ps_spec, "worker": worker_spec})
    

    以下のコードブロックは別途修正が必要ありません。

        if not FLAGS.existing_servers:
            # Not using existing servers. Create an in-process server.
    
            server = tf.train.Server(cluster, job_name=NCP_FLAGS.job_name, task_index=NCP_FLAGS.task_index)
            if NCP_FLAGS.job_name == "ps":
                server.join()
    
        is_chief = (NCP_FLAGS.task_index == 0)
        if FLAGS.num_gpus > 0:
            # Avoid gpu allocation conflict: now allocate task_num -> #gpu
            # for each worker in the corresponding machine
            gpu = (NCP_FLAGS.task_index % FLAGS.num_gpus)
            worker_device = "/job:worker/task:%d/gpu:%d" % (NCP_FLAGS.task_index, gpu)
        elif FLAGS.num_gpus == 0:
            # Just allocate the CPU to worker server
            cpu = 0
            worker_device = "/job:worker/task:%d/cpu:%d" % (NCP_FLAGS.task_index, cpu)
        # The device setter will automatically place Variables ops on separate
        # parameter servers (ps). The non-Variable ops will be placed on the workers.
        # The ps use CPU and workers use corresponding GPU
        with tf.device(
                tf.train.replica_device_setter(
                    worker_device=worker_device,
                    ps_device="/job:ps/cpu:0",
                    cluster=cluster)):
            global_step = tf.Variable(0, name="global_step", trainable=False)
    
            # Variables of the hidden layer
            hid_w = tf.Variable(
                tf.truncated_normal(
                    [IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                    stddev=1.0 / IMAGE_PIXELS),
                name="hid_w")
            hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")
    
            # Variables of the softmax layer
            sm_w = tf.Variable(
                tf.truncated_normal(
                    [FLAGS.hidden_units, 10],
                    stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
                name="sm_w")
            sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
    
            # Ops: located on the worker specified with NCP_FLAGS.task_index
            x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
            y_ = tf.placeholder(tf.float32, [None, 10])
    
            hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
            hid = tf.nn.relu(hid_lin)
    
            y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
            cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
    
            opt = tf.train.AdamOptimizer(FLAGS.learning_rate)
    
            if FLAGS.sync_replicas:
                if FLAGS.replicas_to_aggregate is None:
                    replicas_to_aggregate = num_workers
                else:
                    replicas_to_aggregate = FLAGS.replicas_to_aggregate
    
                opt = tf.train.SyncReplicasOptimizer(
                    opt,
                    replicas_to_aggregate=replicas_to_aggregate,
                    total_num_replicas=num_workers,
                    name="mnist_sync_replicas")
    
            train_step = opt.minimize(cross_entropy, global_step=global_step)
    
            if FLAGS.sync_replicas:
                local_init_op = opt.local_step_init_op
                if is_chief:
                    local_init_op = opt.chief_init_op
    
                ready_for_local_init_op = opt.ready_for_local_init_op
    
                # Initial token and chief queue runners required by the sync_replicas mode
                chief_queue_runner = opt.get_chief_queue_runner()
                sync_init_op = opt.get_init_tokens_op()
    
            init_op = tf.global_variables_initializer()
            train_dir = tempfile.mkdtemp()
    
            if FLAGS.sync_replicas:
                sv = tf.train.Supervisor(
                    is_chief=is_chief,
                    logdir=train_dir,
                    init_op=init_op,
                    local_init_op=local_init_op,
                    ready_for_local_init_op=ready_for_local_init_op,
                    recovery_wait_secs=1,
                    global_step=global_step)
            else:
                sv = tf.train.Supervisor(
                    is_chief=is_chief,
                    logdir=train_dir,
                    init_op=init_op,
                    recovery_wait_secs=1,
                    global_step=global_step)
    
            sess_config = tf.ConfigProto(
                allow_soft_placement=True,
                log_device_placement=False,
                device_filters=["/job:ps",
                                "/job:worker/task:%d" % NCP_FLAGS.task_index])
    
            # The chief worker (task_index==0) session will prepare the session,
            # while the remaining workers will wait for the preparation to complete.
            if is_chief:
                print("Worker %d: Initializing session..." % NCP_FLAGS.task_index)
            else:
                print("Worker %d: Waiting for session to be initialized..." %
                      NCP_FLAGS.task_index)
    
            if FLAGS.existing_servers:
                server_grpc_url = "grpc://" + worker_spec[NCP_FLAGS.task_index]
                print("Using existing server at: %s" % server_grpc_url)
    
                sess = sv.prepare_or_wait_for_session(server_grpc_url, config=sess_config)
            else:
                sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)
    
            print("Worker %d: Session initialization complete." % NCP_FLAGS.task_index)
    
            if FLAGS.sync_replicas and is_chief:
                # Chief worker will start the chief queue runner and call the init op.
                sess.run(sync_init_op)
                sv.start_queue_runners(sess, [chief_queue_runner])
    
            # Perform training
            time_begin = time.time()
            print("Training begins @ %f" % time_begin)
    
            local_step = 0
            while True:
                # Training feed
                batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                train_feed = {x: batch_xs, y_: batch_ys}
    
                _, step = sess.run([train_step, global_step], feed_dict=train_feed)
                local_step += 1
    
                now = time.time()
                print("%f: Worker %d: training step %d done (global step: %d)" %
                      (now, NCP_FLAGS.task_index, local_step, step))
    
                if step >= FLAGS.train_steps:
                    break
    
            time_end = time.time()
            print("Training ends @ %f" % time_end)
            training_time = time_end - time_begin
            print("Training elapsed time: %f s" % training_time)
    
            # Validation feed
            val_feed = {x: mnist.validation.images, y_: mnist.validation.labels}
            val_xent = sess.run(cross_entropy, feed_dict=val_feed)
            print("After %d training step(s), validation cross entropy = %g" %
                  (FLAGS.train_steps, val_xent))
    
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser()
    

    次はユーザーコードに別途追加が必要なコードブロックです。Clusterから引き渡すClusterの構成情報をargparseで自動で受け取ります。

    • --worker_hostsと--ps_hostsは全てのノードサーバに同じ値が転送されます。
    • --job_nameと --task_indexはシステムから自動で当該ノードに適した情報を提供します(但し、'tcm submit' 実行の際はps_num 値を指定してパラメータサーバ数を増加させることができ、この場合は適用されたパラメータが転送されます)。
    • --worker_hosts, --ps_hosts, --job_name, --task_indexは予め定義された予約因子値であるため、そのまま使用しなければなりません。
    • unparsedという因子値を通じてユーザー定義因子値を受け取れます。
      ## get cluster info by NCP
      parser.add_argument("--worker_hosts", type=str)
      parser.add_argument("--ps_hosts", type=str)
      parser.add_argument("--job_name", type=str)
      parser.add_argument("--task_index", type=int)
    
    • unparsed 因子値を使用するためには parser.parse_args() の代わりにparser.parse_knwon_args()を使用します。
      NCP_FLAGS, unparsed = parser.parse_known_args()
    
      tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
    

    TF_CONFIG システム変数の活用

    Cluster Spec情報を RunConfigにパースする場合は split(",")を用いてパースする代わりに下のコードを活用してください。

      tf_config_dic = {"cluster": {"ps": ps_str.split(","),
                                   "worker": worker_str.split(","),
                                 # "master": master_str.split(",")
                                   },
                          "task": {"type": task_type,
                                   "index": task_index}}
    
      os.environ["TF_CONFIG"] = json.dumps(tf_config_dic)
    
    • マスターノードはSpecial Work Nodeで普通モデルのChecksumとログを管理する用途で使われます。
    • 必要な場合、ワーカーサーバリストの中から一台を指定してポート番号を他の番号に変更して使用してください(この場合マスターノードのask_indexは通常 0です)。
    • 受動で TF_CONFIGを読み取る場合は os.environ.get()を使用してください。

    外部モジュールをインポートする方法

    tcm submit'は単一学習プログラムのみ各ノードに配布するように構成されています。外部モジュールをインポートする必要のある場合は共用NASストレージ /mnt/nasにコードをコピーしてから以下のように使用してください。

        import sys
    
        sys.path.insert(0, [모듈의 PATH])
        import [모듈명]
    

    tcm submitを使用せずに受動で実行する方法

    ユーザーパッケージの構造上 tcm submit 命令語の使用が難しい場合 Cluster master コンソールから次のように命令を実行できます。

    • サーバノードのアクセスポートは基本的に22番です。
    • `program_pathはユーザーコードのルートです。全てのサーバノードにアクセスできなければならないため/mnt/nas 下位に位置するようにしてください。
    • `arg_stringはユーザープログラムに引き渡さなければならないカスタム因子値がある場合の例です。
    • /home/ncp/ncp.logはログルートの例です。/mnt/nas下位にlog フォルダを作成して使ってください。
            _cmd = "ssh -p " + port + " root@" + target_ip + \
                   " " + "/home/ncp/anaconda2/envs/ncp/bin/python " \
                   + program_path + " " + arg_string + \
                   " ' >> /mnt/nas/log/ncp.log 2>&1 &"
            
            os.system(_cmd)      
    

    この記事は役に立ちましたか?

    Changing your password will log you out immediately. Use the new password to log back in.
    First name must have atleast 2 characters. Numbers and special characters are not allowed.
    Last name must have atleast 1 characters. Numbers and special characters are not allowed.
    Enter a valid email
    Enter a valid password
    Your profile has been successfully updated.