尝试访问 multiprocessing.Pool 工作进程中的持久数据时出现不稳定的运行时异常
Getting erratic runtime exceptions trying to access persistant data in multiprocessing.Pool worker processes
灵感来自 this solution I am trying to set up a multiprocessing pool of worker processes in Python. The idea is to pass some data to the worker processes before they actually start their work and reuse it eventually. It's intended to minimize the amount of data which needs to be packed/unpacked for every call into a worker process (i.e. reducing inter-process communication overhead). My MCVE 看起来像这样:
import multiprocessing as mp
import numpy as np
def create_worker_context():
global context # create "global" context in worker process
context = {}
def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
context.update({
'worker_id': worker_id,
'some_const_array': some_const_array,
'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
}) # store context information in global namespace of worker
return True # return True, verifying that the worker process received its data
class data_analysis:
def __init__(self):
self.DTYPE = 'float32'
self.CPU_LEN = mp.cpu_count()
self.DIMS = 100
self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
# Init multiprocessing pool
self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
pool_results = [
self.cpu_pool.apply_async(
init_worker_context,
args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
) for core_id in range(self.CPU_LEN)
] # pass information to workers' context
result_batches = [result.get() for result in pool_results] # check if they got the information
if not all(result_batches): # raise an error if things did not work
raise SyntaxError('Workers could not be initialized ...')
@staticmethod
def process_batch(batch_data):
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
return context['tmp'] # return result
def process_all(self):
input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
pool_results = [
self.cpu_pool.apply_async(
data_analysis.process_batch,
args = (input_data,)
) for _ in range(self.CPU_LEN)
] # let workers actually work
result_batches = [result.get() for result in pool_results]
for batch in result_batches[1:]:
np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
print(result_batches[0]) # show result
if __name__ == '__main__':
data_analysis().process_all()
我是运行以上用CPython3.6.6。
奇怪的是...有时有效,有时无效。如果不起作用,process_batch
方法将抛出异常,因为它无法在 context
中找到 some_const_array
作为键。完整的回溯看起来像这样:
(env) me@box:/path> python so.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/so.py", line 37, in process_batch
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/so.py", line 54, in <module>
data_analysis().process_all()
File "/path/so.py", line 48, in process_all
result_batches = [result.get() for result in pool_results]
File "/path/so.py", line 48, in <listcomp>
result_batches = [result.get() for result in pool_results]
File "/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
KeyError: 'some_const_array'
我很纳闷。这里发生了什么?
如果我的 context
词典包含 "higher type" 的对象,例如数据库驱动程序或类似的,我没有遇到这种问题。如果我的 context
词典包含基本的 Python 数据类型、集合或 numpy 数组,我只能重现它。
(是否有可能以更可靠的方式实现相同目标的更好方法?我知道我的方法被认为是 hack ...)
您需要将 init_worker_context
的内容重新定位到您的 initializer
函数中 create_worker_context
。
您假设 每个 工作进程都将 运行 init_worker_context
造成您的困惑。
您提交给池的任务被送入一个内部任务队列,所有工作进程都从中读取。你的情况是,一些工作进程完成了他们的任务并再次竞争获得新任务。因此,可能会发生一个工作进程将执行多个任务而另一个工作进程不会执行一个任务的情况。请记住 OS 计划 运行 线程(工作进程)的时间。
灵感来自 this solution I am trying to set up a multiprocessing pool of worker processes in Python. The idea is to pass some data to the worker processes before they actually start their work and reuse it eventually. It's intended to minimize the amount of data which needs to be packed/unpacked for every call into a worker process (i.e. reducing inter-process communication overhead). My MCVE 看起来像这样:
import multiprocessing as mp
import numpy as np
def create_worker_context():
global context # create "global" context in worker process
context = {}
def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
context.update({
'worker_id': worker_id,
'some_const_array': some_const_array,
'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
}) # store context information in global namespace of worker
return True # return True, verifying that the worker process received its data
class data_analysis:
def __init__(self):
self.DTYPE = 'float32'
self.CPU_LEN = mp.cpu_count()
self.DIMS = 100
self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
# Init multiprocessing pool
self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
pool_results = [
self.cpu_pool.apply_async(
init_worker_context,
args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
) for core_id in range(self.CPU_LEN)
] # pass information to workers' context
result_batches = [result.get() for result in pool_results] # check if they got the information
if not all(result_batches): # raise an error if things did not work
raise SyntaxError('Workers could not be initialized ...')
@staticmethod
def process_batch(batch_data):
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
return context['tmp'] # return result
def process_all(self):
input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
pool_results = [
self.cpu_pool.apply_async(
data_analysis.process_batch,
args = (input_data,)
) for _ in range(self.CPU_LEN)
] # let workers actually work
result_batches = [result.get() for result in pool_results]
for batch in result_batches[1:]:
np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
print(result_batches[0]) # show result
if __name__ == '__main__':
data_analysis().process_all()
我是运行以上用CPython3.6.6。
奇怪的是...有时有效,有时无效。如果不起作用,process_batch
方法将抛出异常,因为它无法在 context
中找到 some_const_array
作为键。完整的回溯看起来像这样:
(env) me@box:/path> python so.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/so.py", line 37, in process_batch
context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/so.py", line 54, in <module>
data_analysis().process_all()
File "/path/so.py", line 48, in process_all
result_batches = [result.get() for result in pool_results]
File "/path/so.py", line 48, in <listcomp>
result_batches = [result.get() for result in pool_results]
File "/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
KeyError: 'some_const_array'
我很纳闷。这里发生了什么?
如果我的 context
词典包含 "higher type" 的对象,例如数据库驱动程序或类似的,我没有遇到这种问题。如果我的 context
词典包含基本的 Python 数据类型、集合或 numpy 数组,我只能重现它。
(是否有可能以更可靠的方式实现相同目标的更好方法?我知道我的方法被认为是 hack ...)
您需要将 init_worker_context
的内容重新定位到您的 initializer
函数中 create_worker_context
。
您假设 每个 工作进程都将 运行 init_worker_context
造成您的困惑。
您提交给池的任务被送入一个内部任务队列,所有工作进程都从中读取。你的情况是,一些工作进程完成了他们的任务并再次竞争获得新任务。因此,可能会发生一个工作进程将执行多个任务而另一个工作进程不会执行一个任务的情况。请记住 OS 计划 运行 线程(工作进程)的时间。