在 dask 的单独进程中调用 scheduler.multiprocessing.get
Calling scheduler.multiprocessing.get in a separate process in dask
我正在使用大型文本语料库训练神经网络。每个文本都会生成一个相当大的矩阵,因为我使用的是卷积模型。由于我的数据不会占用我仍然很大的内存,我尝试对其进行流式传输,并使用 keras.models fit_generator.
为了给 keras 提供数据,我有一个由不同预处理步骤组成的管道,我用一个有很多分区的 dask 包安排了它。 dask 包读取磁盘上的文件。
即使是 dask 也没有以一种聪明的方式处理迭代(它只是计算()和迭代结果,在我的情况下会破坏内存),我要使用这样的东西:
def compute_partition_iter(collection, **kwargs):
"""A utility to compute a collection items after items
"""
get = kwargs.pop("get", None) or _globals['get']
if get is None:
get = collection.__dask_scheduler__
postcompute_func, postcompute_args = collection.__dask_postcompute__()
dsk = collection.__dask_graph__()
for key in collection.__dask_keys__():
yield from f([partition], *args)
此计算逐个分区 return 项,在我们跨越分区边界时计算下一个分区。
这种方法有一个问题:只有当我们从分区中命中最后一项时,我们才会引发下一个元素的计算,从而导致延迟到下一个元素。在此延迟内,keras 停滞不前,我们失去了宝贵的时间!
所以我想象 运行在一个单独的过程中将上述 compute_partition_iter
归功于 multiprocessing.Pool
,在 Queue
中提供分区,例如 2 个插槽,以便在发电机,我不会总是准备好一个分区。
但是dask.bag
好像不支持这个。我没有深入研究代码,但似乎使用了一些异步方法,或者我不知道是什么。
这是该问题的可重现代码。
首先是一个有效的代码,使用一个简单的范围。
import multiprocessing
import time
def put_q(n, q):
for i in range(n):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
这输出
0 <-
1 <-
2 <-
zzz
3 <-
-> 0
zzz
-> 1
zzz
-> 2
zzz
-> 3
zzz
你可以看到,正如预期的那样,元素是在预期中计算的,一切正常。
现在让我们用 dask.bag
:
替换范围
import multiprocessing
import time
import dask.bag
def put_q(n, q):
for i in dask.bag.from_sequence(range(n), npartitions=2):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
在 jupyter notebook 中,它会无限期地引发:
Process ForkPoolWorker-71:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
initializer(*initargs)
File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
for i in dask.bag.from_sequence(range(n), npartitions=2):
File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
return iter(self.compute())
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
results = get(dsk, keys, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
initializer=initialize_worker_process)
File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
当主进程停止时,正在等待队列中的元素。
我也尝试过使用 ipyparallel 集群,但在这种情况下,主进程只是停滞了(没有异常痕迹)。
有谁知道这样做的正确方法吗?
有什么方法可以 运行 scheduler.get 与我的主代码并行?
最后我应该仔细看看异常!
Whosebug 给了我解决方案:Python Process Pool non-daemonic?
事实上,由于 bag 调度程序使用 Pool,因此不能在 pool 派生的进程中调用它。我的解决方案是简单地使用线程。 (注意错误及其解决方案取决于您使用的调度程序)。
所以我用 multiprocessing.Pool 代替了 multiprocessing.pool.ThreadPool,它就像一个魅力,无论是在普通笔记本中,还是在 using ipyparallel.
中
所以它是这样的:
import queue
from multiprocessing.pool import ThreadPool
import time
import dask.bag
def put_q(n, q):
b = dask.bag.from_sequence(range(n), npartitions=3)
for i in b:
print(i, "<-")
q.put(i)
q.put(None)
q = queue.Queue(2)
with ThreadPool(1, put_q, (6, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
输出:
zzz
0 <-
1 <-
2 <-
-> 0
zzz
3 <-
-> 1
zzz
4 <-
-> 5 <-
2
zzz
-> 3
zzz
-> 4
zzz
-> 5
zzz
我正在使用大型文本语料库训练神经网络。每个文本都会生成一个相当大的矩阵,因为我使用的是卷积模型。由于我的数据不会占用我仍然很大的内存,我尝试对其进行流式传输,并使用 keras.models fit_generator.
为了给 keras 提供数据,我有一个由不同预处理步骤组成的管道,我用一个有很多分区的 dask 包安排了它。 dask 包读取磁盘上的文件。
即使是 dask 也没有以一种聪明的方式处理迭代(它只是计算()和迭代结果,在我的情况下会破坏内存),我要使用这样的东西:
def compute_partition_iter(collection, **kwargs):
"""A utility to compute a collection items after items
"""
get = kwargs.pop("get", None) or _globals['get']
if get is None:
get = collection.__dask_scheduler__
postcompute_func, postcompute_args = collection.__dask_postcompute__()
dsk = collection.__dask_graph__()
for key in collection.__dask_keys__():
yield from f([partition], *args)
此计算逐个分区 return 项,在我们跨越分区边界时计算下一个分区。
这种方法有一个问题:只有当我们从分区中命中最后一项时,我们才会引发下一个元素的计算,从而导致延迟到下一个元素。在此延迟内,keras 停滞不前,我们失去了宝贵的时间!
所以我想象 运行在一个单独的过程中将上述 compute_partition_iter
归功于 multiprocessing.Pool
,在 Queue
中提供分区,例如 2 个插槽,以便在发电机,我不会总是准备好一个分区。
但是dask.bag
好像不支持这个。我没有深入研究代码,但似乎使用了一些异步方法,或者我不知道是什么。
这是该问题的可重现代码。
首先是一个有效的代码,使用一个简单的范围。
import multiprocessing
import time
def put_q(n, q):
for i in range(n):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(2)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
这输出
0 <-
1 <-
2 <-
zzz
3 <-
-> 0
zzz
-> 1
zzz
-> 2
zzz
-> 3
zzz
你可以看到,正如预期的那样,元素是在预期中计算的,一切正常。
现在让我们用 dask.bag
:
import multiprocessing
import time
import dask.bag
def put_q(n, q):
for i in dask.bag.from_sequence(range(n), npartitions=2):
print(i, "<-")
q.put(i)
q.put(None)
q = multiprocessing.Queue(5)
with multiprocessing.Pool(1, put_q, (4, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
在 jupyter notebook 中,它会无限期地引发:
Process ForkPoolWorker-71:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/pool.py", line 103, in worker
initializer(*initargs)
File "<ipython-input-3-e1e9ef9354a0>", line 8, in put_q
for i in dask.bag.from_sequence(range(n), npartitions=2):
File "/usr/local/lib/python3.5/dist-packages/dask/bag/core.py", line 1190, in __iter__
return iter(self.compute())
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 154, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/base.py", line 407, in compute
results = get(dsk, keys, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/dask/multiprocessing.py", line 152, in get
initializer=initialize_worker_process)
File "/usr/lib/python3.5/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib/python3.5/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib/python3.5/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
当主进程停止时,正在等待队列中的元素。
我也尝试过使用 ipyparallel 集群,但在这种情况下,主进程只是停滞了(没有异常痕迹)。
有谁知道这样做的正确方法吗?
有什么方法可以 运行 scheduler.get 与我的主代码并行?
最后我应该仔细看看异常!
Whosebug 给了我解决方案:Python Process Pool non-daemonic?
事实上,由于 bag 调度程序使用 Pool,因此不能在 pool 派生的进程中调用它。我的解决方案是简单地使用线程。 (注意错误及其解决方案取决于您使用的调度程序)。
所以我用 multiprocessing.Pool 代替了 multiprocessing.pool.ThreadPool,它就像一个魅力,无论是在普通笔记本中,还是在 using ipyparallel.
中所以它是这样的:
import queue
from multiprocessing.pool import ThreadPool
import time
import dask.bag
def put_q(n, q):
b = dask.bag.from_sequence(range(n), npartitions=3)
for i in b:
print(i, "<-")
q.put(i)
q.put(None)
q = queue.Queue(2)
with ThreadPool(1, put_q, (6, q)) as pool:
i = True
while i is not None:
print("zzz")
time.sleep(.5)
i = q.get()
if i is None:
break
print("-> ", i)
输出:
zzz
0 <-
1 <-
2 <-
-> 0
zzz
3 <-
-> 1
zzz
4 <-
-> 5 <-
2
zzz
-> 3
zzz
-> 4
zzz
-> 5
zzz