Python multiprocessing.Process 行为不确定
Python multiprocessing.Process behaves non deterministic
以下代码显示了一个简单的 multiprocessing.Process 管道,其中包含一个共享的列表字典和一个用于不同消费者的任务队列:
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_dict):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_dict = result_dict
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print('%s: %s' % (proc_name, next_task))
# Do something with the next_task
l = self.result_dict[5]
l.append(3)
self.result_dict[5] = l
# alternative, but same problem
#self.result_dict[5] += [3]
self.task_queue.task_done()
return
def provide_tasks(tasks, num_worker):
low = [
['w1', 'w2'],
['w3'],
['w4', 'w5']
]
for el in low:
tasks.put(el)
# Add a poison pill for each worker
for i in range(num_worker):
tasks.put(None)
if __name__ == '__main__':
num_worker = 3
tasks = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
results = manager.dict()
lists = [manager.list() for i in range(1, 11)]
for i in range(1, 11):
results[i] = lists[i - 1]
worker = [Consumer(tasks, results) for i in range(num_worker)]
for w in worker:
w.start()
p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
p.start()
# Wait for all of the tasks to finish
p.join()
print(results)
当您 运行 此示例与 Python3.x 时,您将收到结果字典的不同输出。我实际上希望结果字典看起来像
{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
但对于某些处决,它看起来像这样:
{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
谁能给我解释一下这种行为?为什么少了一个数字?
根据建议的答案更新解决方法:
if next_task is None:
with lock:
self.result_dict.update(self.local_dict)
[...]
其中锁是 manager.Lock()
而 self.local_dict 是 defaultdict(list)
.
根据答案评论移动了锁。还添加了一个不适用于锁的版本。
# Works
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
self.task_queue.task_done()
# Doesn't work. Even if I move the lock out of the loop.
for x in range(1, 10):
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
为了使第二个示例正常工作,我们也需要对所有 worker 调用 join
。
获取列表的本地副本,对其进行修改,然后将其重新分配给管理器字典不是原子操作,因此创建了追加操作可以获得 "lost".
l = self.result_dict[5] # <-- race begins
l.append(3)
self.result_dict[5] = l # <-- race ends
以下代码显示了一个简单的 multiprocessing.Process 管道,其中包含一个共享的列表字典和一个用于不同消费者的任务队列:
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_dict):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_dict = result_dict
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print('%s: %s' % (proc_name, next_task))
# Do something with the next_task
l = self.result_dict[5]
l.append(3)
self.result_dict[5] = l
# alternative, but same problem
#self.result_dict[5] += [3]
self.task_queue.task_done()
return
def provide_tasks(tasks, num_worker):
low = [
['w1', 'w2'],
['w3'],
['w4', 'w5']
]
for el in low:
tasks.put(el)
# Add a poison pill for each worker
for i in range(num_worker):
tasks.put(None)
if __name__ == '__main__':
num_worker = 3
tasks = multiprocessing.JoinableQueue()
manager = multiprocessing.Manager()
results = manager.dict()
lists = [manager.list() for i in range(1, 11)]
for i in range(1, 11):
results[i] = lists[i - 1]
worker = [Consumer(tasks, results) for i in range(num_worker)]
for w in worker:
w.start()
p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
p.start()
# Wait for all of the tasks to finish
p.join()
print(results)
当您 运行 此示例与 Python3.x 时,您将收到结果字典的不同输出。我实际上希望结果字典看起来像
{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
但对于某些处决,它看起来像这样:
{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}
谁能给我解释一下这种行为?为什么少了一个数字?
根据建议的答案更新解决方法:
if next_task is None:
with lock:
self.result_dict.update(self.local_dict)
[...]
其中锁是 manager.Lock()
而 self.local_dict 是 defaultdict(list)
.
根据答案评论移动了锁。还添加了一个不适用于锁的版本。
# Works
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
self.task_queue.task_done()
# Doesn't work. Even if I move the lock out of the loop.
for x in range(1, 10):
with lock:
l = self.result_dict[x]
l.append(3)
self.result_dict[x] = l
为了使第二个示例正常工作,我们也需要对所有 worker 调用 join
。
获取列表的本地副本,对其进行修改,然后将其重新分配给管理器字典不是原子操作,因此创建了追加操作可以获得 "lost".
l = self.result_dict[5] # <-- race begins
l.append(3)
self.result_dict[5] = l # <-- race ends