Tensorflow + joblib:限于8个进程?
Tensorflow + joblib: limited to 8 processes?
我使用 TensorFlow 创建了一个统计估计器。我遵循了 sklearn 的估计器,所以我有一个 class 打包了所有东西 包括导入 Tensorflow 和启动 TF 的会话 (如果我在 class 之外导入 TF 没有任何并行工作完全没有)。
我需要多次 运行 随机数据上的估计器来查看经验分布,所以我使用 joblib 并行化创建数据、创建估计器对象和 运行 的代码s 对数据的估计。
我正在 linux 服务器上工作,它有 64 个内核(和大量内存),我在其中 运行 比这大得多的工作,也使用 joblib。但是,当我尝试 运行ning 基于 TF 的代码时,我只能 运行 8 个进程。如果我尝试使用 9,那么 top
中只会显示 8 个,当这 8 个完成时,joblib 永远不会发送另一个 8 并且根本不会 returns 或者 returns 以下错误消息
"BrokenProcessPool: A process in the executor was terminated abruptly
while the future was running or pending."
如果我将进程限制为 8 个,那么一切正常。
我尝试将 joblib 的后端更改为 dask.parallel 并且我有相同的行为。我从后端获得了更多信息,不断有消息说
"distributed.nanny - WARNING - Worker process 7602 was killed by
unknown signal"
我希望能够 运行 超过 8 个进程。 问题是:这是硬限制还是我可以通过一些 TF 参数更改它?我能以任何方式解决这个问题吗? 我认为限制与 Tensorflow 相关,因为一旦 8 个进程 运行ning(并且它们需要几个小时)我不能 运行 来自 Tensorflow 的任何其他东西在那台机器上。
感谢您的帮助!!
以下代码重现了错误:
from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed
class MyEstimator(TransformerMixin):
def __init__(self):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
self.session = self._tf.Session()
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)
def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})
return self
def run_estimator():
my_est = MyEstimator()
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)
Parallel(n_jobs=16)(delayed(run_estimator)() for _ in range(16))
我正在研究 Linux、Python 3.6.3、TensorFlow 1.7.0、joblib 0.12。
几个月后,我找到了一个 TensorFlow 服务器的解决方案,https://www.tensorflow.org/deploy/distributed
from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed
class MyEstimator(TransformerMixin):
def __init__(self, target):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
config = self._tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
device_count={"CPU":4},
use_per_session_threads=True)
config.graph_options.optimizer_options.global_jit_level = tf.OptimizerOptions.ON_1
pool = config.session_inter_op_thread_pool.add()
pool.num_threads = 1
self.session = self._tf.Session(target)
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)
def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})
return self
def run_estimator(target):
my_est = MyEstimator(target)
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)
return 1
import tensorflow as tf
server = tf.train.Server.create_local_server()
Parallel(n_jobs=16)(delayed(run_estimator)(server.target) for _ in range(16))
我使用 TensorFlow 创建了一个统计估计器。我遵循了 sklearn 的估计器,所以我有一个 class 打包了所有东西 包括导入 Tensorflow 和启动 TF 的会话 (如果我在 class 之外导入 TF 没有任何并行工作完全没有)。
我需要多次 运行 随机数据上的估计器来查看经验分布,所以我使用 joblib 并行化创建数据、创建估计器对象和 运行 的代码s 对数据的估计。
我正在 linux 服务器上工作,它有 64 个内核(和大量内存),我在其中 运行 比这大得多的工作,也使用 joblib。但是,当我尝试 运行ning 基于 TF 的代码时,我只能 运行 8 个进程。如果我尝试使用 9,那么 top
中只会显示 8 个,当这 8 个完成时,joblib 永远不会发送另一个 8 并且根本不会 returns 或者 returns 以下错误消息
"BrokenProcessPool: A process in the executor was terminated abruptly while the future was running or pending."
如果我将进程限制为 8 个,那么一切正常。 我尝试将 joblib 的后端更改为 dask.parallel 并且我有相同的行为。我从后端获得了更多信息,不断有消息说
"distributed.nanny - WARNING - Worker process 7602 was killed by unknown signal"
我希望能够 运行 超过 8 个进程。 问题是:这是硬限制还是我可以通过一些 TF 参数更改它?我能以任何方式解决这个问题吗? 我认为限制与 Tensorflow 相关,因为一旦 8 个进程 运行ning(并且它们需要几个小时)我不能 运行 来自 Tensorflow 的任何其他东西在那台机器上。
感谢您的帮助!!
以下代码重现了错误:
from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed
class MyEstimator(TransformerMixin):
def __init__(self):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
self.session = self._tf.Session()
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)
def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})
return self
def run_estimator():
my_est = MyEstimator()
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)
Parallel(n_jobs=16)(delayed(run_estimator)() for _ in range(16))
我正在研究 Linux、Python 3.6.3、TensorFlow 1.7.0、joblib 0.12。
几个月后,我找到了一个 TensorFlow 服务器的解决方案,https://www.tensorflow.org/deploy/distributed
from sklearn.base import TransformerMixin
import numpy as np
from joblib import Parallel, delayed
class MyEstimator(TransformerMixin):
def __init__(self, target):
import tensorflow as tf
self._tf = tf
self._graph = tf.Graph()
with self._graph.as_default():
config = self._tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
device_count={"CPU":4},
use_per_session_threads=True)
config.graph_options.optimizer_options.global_jit_level = tf.OptimizerOptions.ON_1
pool = config.session_inter_op_thread_pool.add()
pool.num_threads = 1
self.session = self._tf.Session(target)
A0 = np.eye(10, 2)
self.a_var = a_var = tf.Variable(A0, name='a_var', dtype=tf.float64)
self._x = x = tf.placeholder(dtype=tf.float64)
self._y = y= tf.placeholder(dtype=tf.float64)
w = tf.tensordot(a_var, x, axes=0)
self.f = tf.reduce_mean((y-w)**2)
def fit(self, x, y):
#self.session.run(
# self._tf.global_variables_initializer())
self._f = self.session.run(self.f, feed_dict={self._x:x, self._y: y, self.a_var:np.eye(10, 2)})
return self
def run_estimator(target):
my_est = MyEstimator(target)
x = np.random.normal(0,1,10)
y = np.random.normal(0,1,10)
my_est.fit(x,y)
return 1
import tensorflow as tf
server = tf.train.Server.create_local_server()
Parallel(n_jobs=16)(delayed(run_estimator)(server.target) for _ in range(16))