在 Python 3.4 中使用多处理时出现断言错误
Assertion Error when using multiprocessing in Python 3.4
我对 Python 很陌生,对并行处理完全陌生。
我一直在编写代码来分析点状图像数据(认为 PALM 精简版)并尝试使用 multiprocessing
模块加快我的分析代码。
对于小型数据集,我看到最多四核的速度相当不错。对于大型数据集,我开始收到 AssertionError。我试图制作一个产生相同错误的简化示例,见下文:
import numpy as np
import multiprocessing as mp
import os
class TestClass(object):
def __init__(self, data):
super().__init__()
self.data = data
def top_level_function(self, nproc = 1):
if nproc > os.cpu_count():
nproc = os.cpu_count()
if nproc == 1:
sums = [self._sub_function() for i in range(10)]
elif 1 < nproc:
print('multiprocessing engaged with {} cores'.format(nproc))
with mp.Pool(nproc) as p:
sums = [p.apply_async(self._sub_function) for i in range(10)]
sums = [pp.get() for pp in sums]
self.sums = sums
return sums
def _sub_function(self):
return self.data.sum(0)
if __name__ == "__main__":
t = TestClass(np.zeros((126,512,512)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
t = TestClass(np.zeros((126,2048,2048)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
输出:
10
multiprocessing engaged with 4 cores
10
10
multiprocessing engaged with 4 cores
Process SpawnPoolWorker-6:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
res = self._reader.recv_bytes()
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
Process SpawnPoolWorker-8:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
res = self._reader.recv_bytes()
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
Traceback (most recent call last):
File "test.py", line 41, in <module>
ans = t.top_level_function(4)
File "test.py", line 21, in top_level_function
sums = [pp.get() for pp in sums]
File "test.py", line 21, in <listcomp>
sums = [pp.get() for pp in sums]
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 599, in get
raise self._value
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 383, in _handle_tasks
put(task)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
OSError: [WinError 87] The parameter is incorrect
所以第一个示例运行良好,但后面的示例(更大的数据集)崩溃了。
我完全不知道这个错误是从哪里来的以及如何修复它。任何帮助将不胜感激。
当你这样做时
sums = [p.apply_async(self._sub_function) for i in range(10)]
发生的事情是 self._sub_function
将被 pickle 10 次并发送到工作进程进行处理。要 pickle 实例方法,必须 pickle 整个实例(包括 data
属性)。快速检查显示 np.zeros((126,2048,2048))
腌制时需要 4227858596 字节,而您发送 10 倍于 10 个不同的进程。
您在 _send_bytes
期间遇到错误,这意味着向工作进程的传输被中断,我猜是因为您达到了内存限制。
您可能应该重新考虑您的设计,如果每个工作人员都可以处理部分问题而不需要访问整个数据,那么多处理通常效果最好。
我对 Python 很陌生,对并行处理完全陌生。
我一直在编写代码来分析点状图像数据(认为 PALM 精简版)并尝试使用 multiprocessing
模块加快我的分析代码。
对于小型数据集,我看到最多四核的速度相当不错。对于大型数据集,我开始收到 AssertionError。我试图制作一个产生相同错误的简化示例,见下文:
import numpy as np
import multiprocessing as mp
import os
class TestClass(object):
def __init__(self, data):
super().__init__()
self.data = data
def top_level_function(self, nproc = 1):
if nproc > os.cpu_count():
nproc = os.cpu_count()
if nproc == 1:
sums = [self._sub_function() for i in range(10)]
elif 1 < nproc:
print('multiprocessing engaged with {} cores'.format(nproc))
with mp.Pool(nproc) as p:
sums = [p.apply_async(self._sub_function) for i in range(10)]
sums = [pp.get() for pp in sums]
self.sums = sums
return sums
def _sub_function(self):
return self.data.sum(0)
if __name__ == "__main__":
t = TestClass(np.zeros((126,512,512)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
t = TestClass(np.zeros((126,2048,2048)))
ans = t.top_level_function()
print(len(ans))
ans = t.top_level_function(4)
print(len(ans))
输出:
10
multiprocessing engaged with 4 cores
10
10
multiprocessing engaged with 4 cores
Process SpawnPoolWorker-6:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
res = self._reader.recv_bytes()
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
Process SpawnPoolWorker-8:
Traceback (most recent call last):
File "C:\Anaconda3\lib\multiprocessing\process.py", line 254, in _bootstrap
self.run()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 108, in worker
task = get()
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 355, in get
res = self._reader.recv_bytes()
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
Traceback (most recent call last):
File "test.py", line 41, in <module>
ans = t.top_level_function(4)
File "test.py", line 21, in top_level_function
sums = [pp.get() for pp in sums]
File "test.py", line 21, in <listcomp>
sums = [pp.get() for pp in sums]
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 599, in get
raise self._value
File "C:\Anaconda3\lib\multiprocessing\pool.py", line 383, in _handle_tasks
put(task)
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "C:\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
OSError: [WinError 87] The parameter is incorrect
所以第一个示例运行良好,但后面的示例(更大的数据集)崩溃了。
我完全不知道这个错误是从哪里来的以及如何修复它。任何帮助将不胜感激。
当你这样做时
sums = [p.apply_async(self._sub_function) for i in range(10)]
发生的事情是 self._sub_function
将被 pickle 10 次并发送到工作进程进行处理。要 pickle 实例方法,必须 pickle 整个实例(包括 data
属性)。快速检查显示 np.zeros((126,2048,2048))
腌制时需要 4227858596 字节,而您发送 10 倍于 10 个不同的进程。
您在 _send_bytes
期间遇到错误,这意味着向工作进程的传输被中断,我猜是因为您达到了内存限制。
您可能应该重新考虑您的设计,如果每个工作人员都可以处理部分问题而不需要访问整个数据,那么多处理通常效果最好。