为什么要使用 tf.train.Server 并行执行多个 tf.Session()?
Why should one use tf.train.Server to execute multiple tf.Session() in parallel?
并行执行多个tf.Session()
的官方方法是使用tf.train.Server
,如Distributed TensorFlow中所述
。另一方面,以下适用于 Keras,根据 ,可以在不使用 tf.train.Server 的情况下将其修改为 Tensorflow。
def _training_worker(train_params):
import keras
model = obtain_model(train_params)
model.fit(train_params)
send_message_to_main_process(...)
def train_new_model(train_params):
training_process = multiprocessing.Process(target=_training_worker, args = train_params)
training_process.start()
get_message_from_training_process(...)
training_process.join()
第一种方法比第二种方法快吗?我有一个以第二种方式编写的代码,并且由于我的算法 (AlphaZero) 的性质,单个 GPU 应该 运行 许多进程,每个进程都执行对小批量的预测。
tf.train.Server
专为 集群 中的分布式计算而设计,当需要在不同节点之间进行 通信 时。当训练分布在多台机器上或在某些情况下分布在一台机器上的多个 GPU 上时,这尤其有用。来自文档:
An in-process TensorFlow server, for use in distributed training.
A tf.train.Server
instance encapsulates a set of devices and a tf.Session
target that can participate in distributed training. A server belongs to a cluster (specified by a tf.train.ClusterSpec
), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster.
使用 multiprocessing.Process
生成多个进程并不是 Tensorflow 意义上的集群,因为子进程之间没有交互。此方法更易于设置,但仅限于一台机器。既然你说你只有一台机器,这可能不是一个强有力的论据,但如果你打算扩展到一组机器,你将不得不重新设计整个方法。
tf.train.Server
因此是一种更通用和可扩展的解决方案。此外,它允许通过一些重要的通信来组织复杂的训练,例如异步梯度更新。训练速度是否更快在很大程度上取决于任务,我认为在一个共享 GPU 上不会有显着差异。
仅供参考,服务器的代码如下(图复制示例之间):
# specify the cluster's architecture
cluster = tf.train.ClusterSpec({
'ps': ['192.168.1.1:1111'],
'worker': ['192.168.1.2:1111',
'192.168.1.3:1111']
})
# parse command-line to specify machine
job_type = sys.argv[1] # job type: "worker" or "ps"
task_idx = sys.argv[2] # index job in the worker or ps list as defined in the ClusterSpec
# create TensorFlow Server. This is how the machines communicate.
server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)
# parameter server is updated by remote clients.
# will not proceed beyond this if statement.
if job_type == 'ps':
server.join()
else:
# workers only
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + task_idx,
cluster=cluster)):
# build your model here as if you only were using a single machine
pass
with tf.Session(server.target):
# train your model here
pass
并行执行多个tf.Session()
的官方方法是使用tf.train.Server
,如Distributed TensorFlow中所述
。另一方面,以下适用于 Keras,根据
def _training_worker(train_params):
import keras
model = obtain_model(train_params)
model.fit(train_params)
send_message_to_main_process(...)
def train_new_model(train_params):
training_process = multiprocessing.Process(target=_training_worker, args = train_params)
training_process.start()
get_message_from_training_process(...)
training_process.join()
第一种方法比第二种方法快吗?我有一个以第二种方式编写的代码,并且由于我的算法 (AlphaZero) 的性质,单个 GPU 应该 运行 许多进程,每个进程都执行对小批量的预测。
tf.train.Server
专为 集群 中的分布式计算而设计,当需要在不同节点之间进行 通信 时。当训练分布在多台机器上或在某些情况下分布在一台机器上的多个 GPU 上时,这尤其有用。来自文档:
An in-process TensorFlow server, for use in distributed training.
A
tf.train.Server
instance encapsulates a set of devices and atf.Session
target that can participate in distributed training. A server belongs to a cluster (specified by atf.train.ClusterSpec
), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster.
使用 multiprocessing.Process
生成多个进程并不是 Tensorflow 意义上的集群,因为子进程之间没有交互。此方法更易于设置,但仅限于一台机器。既然你说你只有一台机器,这可能不是一个强有力的论据,但如果你打算扩展到一组机器,你将不得不重新设计整个方法。
tf.train.Server
因此是一种更通用和可扩展的解决方案。此外,它允许通过一些重要的通信来组织复杂的训练,例如异步梯度更新。训练速度是否更快在很大程度上取决于任务,我认为在一个共享 GPU 上不会有显着差异。
仅供参考,服务器的代码如下(图复制示例之间):
# specify the cluster's architecture
cluster = tf.train.ClusterSpec({
'ps': ['192.168.1.1:1111'],
'worker': ['192.168.1.2:1111',
'192.168.1.3:1111']
})
# parse command-line to specify machine
job_type = sys.argv[1] # job type: "worker" or "ps"
task_idx = sys.argv[2] # index job in the worker or ps list as defined in the ClusterSpec
# create TensorFlow Server. This is how the machines communicate.
server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)
# parameter server is updated by remote clients.
# will not proceed beyond this if statement.
if job_type == 'ps':
server.join()
else:
# workers only
with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + task_idx,
cluster=cluster)):
# build your model here as if you only were using a single machine
pass
with tf.Session(server.target):
# train your model here
pass