tensorflow.python.framework.errors_impl.UnknowError: Could not start
gRPC server
1. tf distributed
A computer = server = server is a node that contains multiple GPUs. First of all, the distributed method is to let the GPUs on multiple computers work together.
Distributed work is divided into two parts, parameter server (ps) and worker. PS and worker look familiar, because this is a job, and every server has to work, so I can only choose from these two jobs. The work of ps is similar to storing parameters, and the calculation of loss and the decision of gradient are all performed by workers. The impact of this on the code is that the ps node can actually be done by the CPU. Workers must be made by GPU.
The overall structure is as shown in the figure: there are four severs in total. Each sever is assumed to contain 4 GPUs. The figure below has a total of 16 GPUs. The two servers work as ps, and the two severs work as workers. This name is not actually configured in the code, so ignore it. The server needs to distinguish between the same work, so tasks are introduced and have task ids. Here we just demonstrate the relationship between job (ps, worker) and server (node).
2. Code
The explanation of the code is based on step by step. It’s how to communicate with each other using code.
Theoretically, we need some nodes and assign work to them.
So let’s make a program entrance to accept parameters (who are the nodes and what work they do). I prefer to receive parameters and don’t like to hard-code them in the code. Because flags is the basis of tf, I don’t want to explain the increase in length.
Each node must be independently notified and run independently, which means that if you have one ps and two workers (usually one ps is enough), you have to use the bash command:
python train.py --who are the ps (ps_hosts) --who are the workers (woker_hosts) --what am I assigned to do (job_name) --who am I to do this job (task_index) python train.py --who are the ps (ps_hosts) --who are the workers (woker_hosts) --what am I assigned to do (job_name) --who am I to do this job (task_index) python train.py --who are the ps (ps_hosts) --who are the workers (woker_hosts) --what am I assigned to do (job_name) --who am I to do this job (task_index)
Just enter three times and run three times at the same time. PS and workers will wait for you to finish losing and work together. After all, you have to wait for your companions.
3. Example
https://gist.github.com/yaroslavvb/1124bb02a9fd4abce3d86caf2f950cb2
"""Benchmark tensorflow distributed by adding vector of ones on worker2 to variable on worker1 as fast as possible. On 2014 macbook, TensorFlow 0.10 this shows Local rate: 2175.28 MB per second Distributed rate: 107.13 MB per second """ import subprocess import tensorflow astf import time importsys flags = tf.flags flags.DEFINE_integer("iters", 10, "Maximum number of additions") flags.DEFINE_integer("data_mb", 100, "size of vector in MBs") flags.DEFINE_string("port1", "12222", "port of worker1") flags.DEFINE_string("port2", "12223", "port of worker2") flags.DEFINE_string("task", "", "internal use") FLAGS = flags.FLAGS # setup local cluster from flags host = "127.0.0.1:" cluster = {<!-- -->"worker": [host + FLAGS.port1, host + FLAGS.port2]} clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def() def default_config(): optimizer_options = tf.OptimizerOptions(opt_level=tf.OptimizerOptions.L0) config = tf.ConfigProto( graph_options=tf.GraphOptions(optimizer_options=optimizer_options)) config.log_device_placement = False config.allow_soft_placement = False return config def create_graph(device1, device2): """Create graph that keeps variable on device1 and vector of ones/addition op on device2""" tf.reset_default_graph() dtype=tf.int32 params_size = 250*1000*FLAGS.data_mb # 1MB is 250k integers with tf.device(device1): params = tf.get_variable("params", [params_size], dtype, initializer=tf.zeros_initializer) with tf.device(device2): # constant node gets placed on device1 because of simple_placer # update = tf.constant(1, shape=[params_size], dtype=dtype) update = tf.get_variable("update", [params_size], dtype, initializer=tf.ones_initializer) add_op = params.assign_add(update) init_op = tf.initialize_all_variables() return init_op, add_op def run_benchmark(sess, init_op, add_op): """Returns MB/s rate of addition.""" sess.run(init_op) sess.run(add_op.op) # warm-up start_time = time.time() for i in range(FLAGS.iters): # change to add_op.op to make faster sess.run(add_op) elapsed_time = time.time() - start_time return float(FLAGS.iters)*FLAGS.data_mb/elapsed_time def run_benchmark_local(): ops = create_graph(None, None) sess = tf.Session(config=default_config()) return run_benchmark(sess, *ops) def run_benchmark_distributed(): ops = create_graph("/job:worker/task:0", "/job:worker/task:1") # launch distributed service def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT) runcmd("python %s --task=0"%(sys.argv[0])) runcmd("python %s --task=1"%(sys.argv[0])) time.sleep(1) sess = tf.Session("grpc://" + host + FLAGS.port1, config=default_config()) return run_benchmark(sess, *ops) if __name__=='__main__': if not FLAGS.task: rate1 = run_benchmark_local() rate2 = run_benchmark_distributed() print("Adding data in %d MB chunks" %(FLAGS.data_mb)) print("Local rate: %.2f MB per second" %(rate1,)) print("Distributed rate: %.2f MB per second" %(rate2,)) else: # Launch TensorFlow server server = tf.train.Server(clusterspec, config=default_config(), job_name="worker", task_index=int(FLAGS.task)) server.join()
4. Run 2 distributed worker threads on a single node
https://stackoverflow.com/questions/40877246/distributed-tensorflow-not-working-with-simple-example
https://github.com/ischlag/distributed-tensorflow-example/blob/master/example.py
''' Distributed Tensorflow 1.2.0 example of using data parallelism and share model parameters. Trains a simple sigmoid neural network on mnist for 20 epochs on three machines using one parameter server. Change the hardcoded host urls below with your own hosts. Run like this: pc-01$ python example.py --job_name="ps" --task_index=0 pc-02$ python example.py --job_name="worker" --task_index=0 pc-03$ python example.py --job_name="worker" --task_index=1 pc-04$ python example.py --job_name="worker" --task_index=2 More details here: ischlag.github.io ''' from __future__ import print_function import tensorflow astf importsys import time # cluster specification parameter_servers = ["pc-01:2222"] workers = [ "pc-02:2222", "pc-03:2222", "pc-04:2222"] cluster = tf.train.ClusterSpec({<!-- -->"ps":parameter_servers, "worker":workers}) # input flags tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS # start a server for a specific task server = tf.train.Server( cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) #config batch_size = 100 learning_rate = 0.0005 training_epochs = 20 logs_path = "/tmp/mnist/1" # load mnist data set from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets('MNIST_data', one_hot=True) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Between-graph replication with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # count the number of updates global_step = tf.get_variable( 'global_step', [], initializer = tf.constant_initializer(0), trainable = False) # input images with tf.name_scope('input'): # None -> batch size can be any size, 784 -> flattened mnist image x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input") # target 10 output classes y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input") # model parameters will change during training so we use tf.Variable tf.set_random_seed(1) with tf.name_scope("weights"): W1 = tf.Variable(tf.random_normal([784, 100])) W2 = tf.Variable(tf.random_normal([100, 10])) #bias with tf.name_scope("biases"): b1 = tf.Variable(tf.zeros([100])) b2 = tf.Variable(tf.zeros([10])) # implement model with tf.name_scope("softmax"): # y is our prediction z2 = tf.add(tf.matmul(x,W1),b1) a2 = tf.nn.sigmoid(z2) z3 = tf.add(tf.matmul(a2,W2),b2) y = tf.nn.softmax(z3) # specify cost function with tf.name_scope('cross_entropy'): #thisisourcost cross_entropy = tf.reduce_mean( -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) # specify optimizer with tf.name_scope('train'): # optimizer is an "operation" which we can execute in a session grad_op = tf.train.GradientDescentOptimizer(learning_rate) ''' rep_op = tf.train.SyncReplicasOptimizer( grad_op, replicas_to_aggregate=len(workers), replica_id=FLAGS.task_index, total_num_replicas=len(workers), use_locking=True) train_op = rep_op.minimize(cross_entropy, global_step=global_step) ''' train_op = grad_op.minimize(cross_entropy, global_step=global_step) \t\t\t ''' init_token_op = rep_op.get_init_tokens_op() chief_queue_runner = rep_op.get_chief_queue_runner() ''' with tf.name_scope('Accuracy'): #accuracy correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # create a summary for our cost and accuracy tf.summary.scalar("cost", cross_entropy) tf.summary.scalar("accuracy", accuracy) # merge all summaries into a single "operation" which we can execute in a session summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() print("Variables initialized ...") sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), global_step=global_step, init_op=init_op) begin_time = time.time() frequency=100 with sv.prepare_or_wait_for_session(server.target) as sess: ''' # is chief if FLAGS.task_index == 0: sv.start_queue_runners(sess, [chief_queue_runner]) sess.run(init_token_op) ''' # create log writer object (this will log on every machine) writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph()) \t\t\t\t # perform training cycles start_time = time.time() for epoch in range(training_epochs): # number of batches in one epoch batch_count = int(mnist.train.num_examples/batch_size) count = 0 for i in range(batch_count): batch_x, batch_y = mnist.train.next_batch(batch_size) \t\t\t\t # perform the operations we defined earlier on batch _, cost, summary, step = sess.run( [train_op, cross_entropy, summary_op, global_step], feed_dict={<!-- -->x: batch_x, y_: batch_y}) writer.add_summary(summary, step) count + = 1 if count % frequency == 0 or i + 1 == batch_count: elapsed_time = time.time() - start_time start_time = time.time() print("Step: %d," % (step + 1), "Epoch: -," % (epoch + 1), "Batch: = of =," % (i + 1, batch_count), "Cost: %.4f," % cost, "AvgTime: %3.2fms" % float(elapsed_time*1000/frequency)) count = 0 print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={<!-- -->x: mnist.test.images, y_: mnist.test.labels})) print("Total Time: %3.2fs" % float(time.time() - begin_time)) print("Final Cost: %.4f" % cost) sv.stop() print("done")