Python multiprocessing.Process 行为不确定

Python multiprocessing.Process behaves non deterministic

以下代码显示了一个简单的 multiprocessing.Process 管道,其中包含一个共享的列表字典和一个用于不同消费者的任务队列:

import multiprocessing

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_dict):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_dict = result_dict

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print('%s: %s' % (proc_name, next_task))

            # Do something with the next_task
            l = self.result_dict[5]
            l.append(3)
            self.result_dict[5] = l
            # alternative, but same problem
            #self.result_dict[5] += [3]

            self.task_queue.task_done()
        return

def provide_tasks(tasks, num_worker):
    low = [ 
        ['w1', 'w2'],
        ['w3'],
        ['w4', 'w5']
    ]
    for el in low:
        tasks.put(el)
    # Add a poison pill for each worker
    for i in range(num_worker):
        tasks.put(None)

if __name__ == '__main__':
    num_worker = 3
    tasks = multiprocessing.JoinableQueue()
    manager = multiprocessing.Manager()

    results = manager.dict()
    lists = [manager.list() for i in range(1, 11)]
    for i in range(1, 11):
        results[i] = lists[i - 1]

    worker = [Consumer(tasks, results) for i in range(num_worker)]
    for w in worker:
        w.start()

    p = multiprocessing.Process(target=provide_tasks, args=(tasks,num_worker))
    p.start()
    # Wait for all of the tasks to finish
    p.join()
    print(results)

当您 运行 此示例与 Python3.x 时,您将收到结果字典的不同输出。我实际上希望结果字典看起来像

{1: [], 2: [], 3: [], 4: [], 5: [3, 3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}

但对于某些处决,它看起来像这样:

{1: [], 2: [], 3: [], 4: [], 5: [3, 3], 6: [], 7: [], 8: [], 9: [], 10: []}

谁能给我解释一下这种行为?为什么少了一个数字?

根据建议的答案更新解决方法:

if next_task is None:
    with lock:
        self.result_dict.update(self.local_dict)
        [...]

其中锁是 manager.Lock() 而 self.local_dict 是 defaultdict(list).

根据答案评论移动了锁。还添加了一个不适用于锁的版本。

# Works
with lock:
    l = self.result_dict[x]
    l.append(3)
    self.result_dict[x] = l
self.task_queue.task_done()

# Doesn't work. Even if I move the lock out of the loop. 
for x in range(1, 10):
    with lock:
        l = self.result_dict[x]
        l.append(3)
        self.result_dict[x] = l

为了使第二个示例正常工作,我们也需要对所有 worker 调用 join

获取列表的本地副本,对其进行修改,然后将其重新分配给管理器字典不是原子操作,因此创建了追加操作可以获得 "lost".

描述于this python bug report

l = self.result_dict[5]  # <-- race begins
l.append(3)
self.result_dict[5] = l  # <-- race ends