为什么工作人员要从主线程创建对象的副本?

Why are workers creating copies of the objects from the main thread?

我正在 Python 中尝试一个简单的工作池,其中 objective 从主线程上的迭代器获取值,从而更新此迭代器。 (目的是并行化迭代器,同时在主线程上使用它的结果)

 import multiprocessing as mp

 pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
 #details are given below

但出于某种原因,Pool 似乎正在为每个线程创建迭代器的副本,而不是简单地在主线程中更新它。 (题目在post的最后)

迭代器

所以,这是我要并行化的迭代器。我确保并行地从中获取项目是安全的,并且在获取项目时不使用更新的值:

class TrySeq(object):
    def __init__(self):
        print('created iterator')
        self.gotten = 0 #a simple counter informing how many items were gotten
    def __len__(self):
        return 10
    def __getitem__(self, i):
        time.sleep(3) #simulate a heavy operation

        #must update the gotten count but this value won't affect the values of the items
        self.gotten += 1 
        print('Iterator: got item', i, ' - gotten total: ', self.gotten)
        return (i, i)

要并行化的生成器

现在,这是一个生成器,它将包装该迭代器以使其并行化 "invisibly"。

它运行良好,完全符合我的预期,除了更新 gotten 值。 (我知道它正在等待每个 epoch 中的同步,这不是这个问题的问题)。

#A generator that wraps an iterator and loads items assynchronously   
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):

    sourceQueue = mp.Queue()                     #queue for getting batch indices
    batchQueue = mp.Queue(maxsize = queue_size)  #queue for getting actual batches 
    indices = np.arange(len(iterator))     #array of indices to be shuffled

    #fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
    def fillSource():
        #print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
        if shuffle == True:
            np.random.shuffle(indices)

        #puts the indices in the indices queue
        for i in indices:
            sourceQueue.put(i)

    #function that will load batches from the iterator
    def worker(indicesQueue, destinationQueue, itera):
        while True:
            index = indicesQueue.get(block = True) #get index from the queue
            item = itera[index] #get batch from the iterator
            destinationQueue.put((index,item), block=True) #puts batch in the batch queue


    #creates the thread pool that will work automatically as we get from the batch queue
    pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))

    #generation loop
    for epoch in range(epochs):
        fillSource()
        for batch in range(len(iterator)):

            #yields batches for the outside loop that is using this generator
            originalIndex, batchItems = batchQueue.get(block = True)
            yield epoch, batch, originalIndex, batchItems

    pool.close()
    sourceQueue.close()
    batchQueue.close()
    del pool
    del sourceQueue
    del batchQueue

似乎 Pool 只是为每个线程复制迭代器,但我希望所有线程都更新主线程中的同一个生成器

使用生成器:

这个想法是非常简单地使用它,就像这样:

#outside loop: 
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
    time.sleep(1)
    #print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)

当前输出:

现在,当我 运行 这个时,我看到它有一个 每个 工人的 gotten 值,而不是有一个主要的 gotten 预期值:

created iterator
Iterator: got item 8  - gotten total:  1
Iterator: got item 2  - gotten total:  1
Iterator: got item 0  - gotten total:  1
Iterator: got item 1  - gotten total:  2
Iterator: got item 7  - gotten total:  2
Iterator: got item 6  - gotten total:  2
Iterator: got item 9  - gotten total:  3
Iterator: got item 5  - gotten total:  3
Iterator: got item 3  - gotten total:  3
Iterator: got item 4  - gotten total:  4
Iterator: got item 4  - gotten total:  4
Iterator: got item 2  - gotten total:  5
Iterator: got item 3  - gotten total:  4
Iterator: got item 6  - gotten total:  5
Iterator: got item 7  - gotten total:  5
Iterator: got item 5  - gotten total:  6
Iterator: got item 1  - gotten total:  6
Iterator: got item 9  - gotten total:  7
Iterator: got item 0  - gotten total:  6
Iterator: got item 8  - gotten total:  7
Iterator: got item 7  - gotten total:  8
Iterator: got item 8  - gotten total:  7
Iterator: got item 2  - gotten total:  8
Iterator: got item 3  - gotten total:  8
Iterator: got item 9  - gotten total:  9
Iterator: got item 1  - gotten total:  9
Iterator: got item 6  - gotten total:  9
Iterator: got item 4  - gotten total:  10
Iterator: got item 0  - gotten total:  10
Iterator: got item 5  - gotten total:  10
finished

问题

你没有显示你的导入,但我猜你有:

import multiprocessing as mp

在文件的顶部。 multiprocessing 不受线程支持,它受 forked 或 spawned 进程支持,每个进程都有独立的内存和独立变量。您的初始化是对值进行酸洗(重要的是 iterator),然后对每个值的 新副本 进行酸洗,并在每个工作进程中分别使用它们(注意:在 fork而不是spawn worker,可能不涉及pickling,但是效果是一样的;原始数据在fork时是"snapshotted",每个worker进程继承自己独立的数据快照,与其他进程中的数据没有剩余联系)。

如果您打算使用线程,请将导入更改为:

import multiprocessing.dummy as mp

将支持实现更改为基于线程的池,而不是基于进程的池。基于线程的池在单个共享内存中 space;不涉及 pickling/unpickling 或任何类型的进程间通信。缺点是 CPython 参考解释器上的并行性将受到 the GIL 的限制,更大的共享意味着需要更多的同步来防止竞争条件。

如果您想要流程,那将是一件非常痛苦的事情,因为您实际上不得不为 make it multiprocessing.Manager compliant 的迭代器类型实现代理包装器,这将是一件非常痛苦的事情。