为什么工作人员要从主线程创建对象的副本?
Why are workers creating copies of the objects from the main thread?
我正在 Python 中尝试一个简单的工作池,其中 objective 从主线程上的迭代器获取值,从而更新此迭代器。 (目的是并行化迭代器,同时在主线程上使用它的结果)
import multiprocessing as mp
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#details are given below
但出于某种原因,Pool
似乎正在为每个线程创建迭代器的副本,而不是简单地在主线程中更新它。 (题目在post的最后)
迭代器
所以,这是我要并行化的迭代器。我确保并行地从中获取项目是安全的,并且在获取项目时不使用更新的值:
class TrySeq(object):
def __init__(self):
print('created iterator')
self.gotten = 0 #a simple counter informing how many items were gotten
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(3) #simulate a heavy operation
#must update the gotten count but this value won't affect the values of the items
self.gotten += 1
print('Iterator: got item', i, ' - gotten total: ', self.gotten)
return (i, i)
要并行化的生成器
现在,这是一个生成器,它将包装该迭代器以使其并行化 "invisibly"。
它运行良好,完全符合我的预期,除了更新 gotten
值。 (我知道它正在等待每个 epoch
中的同步,这不是这个问题的问题)。
#A generator that wraps an iterator and loads items assynchronously
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(iterator)) #array of indices to be shuffled
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
#print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
for i in indices:
sourceQueue.put(i)
#function that will load batches from the iterator
def worker(indicesQueue, destinationQueue, itera):
while True:
index = indicesQueue.get(block = True) #get index from the queue
item = itera[index] #get batch from the iterator
destinationQueue.put((index,item), block=True) #puts batch in the batch queue
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#generation loop
for epoch in range(epochs):
fillSource()
for batch in range(len(iterator)):
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
pool.close()
sourceQueue.close()
batchQueue.close()
del pool
del sourceQueue
del batchQueue
似乎 Pool
只是为每个线程复制迭代器,但我希望所有线程都更新主线程中的同一个生成器
使用生成器:
这个想法是非常简单地使用它,就像这样:
#outside loop:
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
time.sleep(1)
#print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)
当前输出:
现在,当我 运行 这个时,我看到它有一个 每个 工人的 gotten
值,而不是有一个主要的 gotten
预期值:
created iterator
Iterator: got item 8 - gotten total: 1
Iterator: got item 2 - gotten total: 1
Iterator: got item 0 - gotten total: 1
Iterator: got item 1 - gotten total: 2
Iterator: got item 7 - gotten total: 2
Iterator: got item 6 - gotten total: 2
Iterator: got item 9 - gotten total: 3
Iterator: got item 5 - gotten total: 3
Iterator: got item 3 - gotten total: 3
Iterator: got item 4 - gotten total: 4
Iterator: got item 4 - gotten total: 4
Iterator: got item 2 - gotten total: 5
Iterator: got item 3 - gotten total: 4
Iterator: got item 6 - gotten total: 5
Iterator: got item 7 - gotten total: 5
Iterator: got item 5 - gotten total: 6
Iterator: got item 1 - gotten total: 6
Iterator: got item 9 - gotten total: 7
Iterator: got item 0 - gotten total: 6
Iterator: got item 8 - gotten total: 7
Iterator: got item 7 - gotten total: 8
Iterator: got item 8 - gotten total: 7
Iterator: got item 2 - gotten total: 8
Iterator: got item 3 - gotten total: 8
Iterator: got item 9 - gotten total: 9
Iterator: got item 1 - gotten total: 9
Iterator: got item 6 - gotten total: 9
Iterator: got item 4 - gotten total: 10
Iterator: got item 0 - gotten total: 10
Iterator: got item 5 - gotten total: 10
finished
问题
- 为什么会这样?
- 如何更新
ParallelIterator
以便它作用于主 iterator
而不是为每个线程创建一个副本?
你没有显示你的导入,但我猜你有:
import multiprocessing as mp
在文件的顶部。 multiprocessing
不受线程支持,它受 fork
ed 或 spawned 进程支持,每个进程都有独立的内存和独立变量。您的初始化是对值进行酸洗(重要的是 iterator
),然后对每个值的 新副本 进行酸洗,并在每个工作进程中分别使用它们(注意:在 fork
而不是spawn worker,可能不涉及pickling,但是效果是一样的;原始数据在fork
时是"snapshotted",每个worker进程继承自己独立的数据快照,与其他进程中的数据没有剩余联系)。
如果您打算使用线程,请将导入更改为:
import multiprocessing.dummy as mp
将支持实现更改为基于线程的池,而不是基于进程的池。基于线程的池在单个共享内存中 space;不涉及 pickling/unpickling 或任何类型的进程间通信。缺点是 CPython 参考解释器上的并行性将受到 the GIL 的限制,更大的共享意味着需要更多的同步来防止竞争条件。
如果您想要流程,那将是一件非常痛苦的事情,因为您实际上不得不为 make it multiprocessing.Manager
compliant 的迭代器类型实现代理包装器,这将是一件非常痛苦的事情。
我正在 Python 中尝试一个简单的工作池,其中 objective 从主线程上的迭代器获取值,从而更新此迭代器。 (目的是并行化迭代器,同时在主线程上使用它的结果)
import multiprocessing as mp
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#details are given below
但出于某种原因,Pool
似乎正在为每个线程创建迭代器的副本,而不是简单地在主线程中更新它。 (题目在post的最后)
迭代器
所以,这是我要并行化的迭代器。我确保并行地从中获取项目是安全的,并且在获取项目时不使用更新的值:
class TrySeq(object):
def __init__(self):
print('created iterator')
self.gotten = 0 #a simple counter informing how many items were gotten
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(3) #simulate a heavy operation
#must update the gotten count but this value won't affect the values of the items
self.gotten += 1
print('Iterator: got item', i, ' - gotten total: ', self.gotten)
return (i, i)
要并行化的生成器
现在,这是一个生成器,它将包装该迭代器以使其并行化 "invisibly"。
它运行良好,完全符合我的预期,除了更新 gotten
值。 (我知道它正在等待每个 epoch
中的同步,这不是这个问题的问题)。
#A generator that wraps an iterator and loads items assynchronously
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(iterator)) #array of indices to be shuffled
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
#print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
for i in indices:
sourceQueue.put(i)
#function that will load batches from the iterator
def worker(indicesQueue, destinationQueue, itera):
while True:
index = indicesQueue.get(block = True) #get index from the queue
item = itera[index] #get batch from the iterator
destinationQueue.put((index,item), block=True) #puts batch in the batch queue
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#generation loop
for epoch in range(epochs):
fillSource()
for batch in range(len(iterator)):
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
pool.close()
sourceQueue.close()
batchQueue.close()
del pool
del sourceQueue
del batchQueue
似乎 Pool
只是为每个线程复制迭代器,但我希望所有线程都更新主线程中的同一个生成器
使用生成器:
这个想法是非常简单地使用它,就像这样:
#outside loop:
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
time.sleep(1)
#print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)
当前输出:
现在,当我 运行 这个时,我看到它有一个 每个 工人的 gotten
值,而不是有一个主要的 gotten
预期值:
created iterator
Iterator: got item 8 - gotten total: 1
Iterator: got item 2 - gotten total: 1
Iterator: got item 0 - gotten total: 1
Iterator: got item 1 - gotten total: 2
Iterator: got item 7 - gotten total: 2
Iterator: got item 6 - gotten total: 2
Iterator: got item 9 - gotten total: 3
Iterator: got item 5 - gotten total: 3
Iterator: got item 3 - gotten total: 3
Iterator: got item 4 - gotten total: 4
Iterator: got item 4 - gotten total: 4
Iterator: got item 2 - gotten total: 5
Iterator: got item 3 - gotten total: 4
Iterator: got item 6 - gotten total: 5
Iterator: got item 7 - gotten total: 5
Iterator: got item 5 - gotten total: 6
Iterator: got item 1 - gotten total: 6
Iterator: got item 9 - gotten total: 7
Iterator: got item 0 - gotten total: 6
Iterator: got item 8 - gotten total: 7
Iterator: got item 7 - gotten total: 8
Iterator: got item 8 - gotten total: 7
Iterator: got item 2 - gotten total: 8
Iterator: got item 3 - gotten total: 8
Iterator: got item 9 - gotten total: 9
Iterator: got item 1 - gotten total: 9
Iterator: got item 6 - gotten total: 9
Iterator: got item 4 - gotten total: 10
Iterator: got item 0 - gotten total: 10
Iterator: got item 5 - gotten total: 10
finished
问题
- 为什么会这样?
- 如何更新
ParallelIterator
以便它作用于主iterator
而不是为每个线程创建一个副本?
你没有显示你的导入,但我猜你有:
import multiprocessing as mp
在文件的顶部。 multiprocessing
不受线程支持,它受 fork
ed 或 spawned 进程支持,每个进程都有独立的内存和独立变量。您的初始化是对值进行酸洗(重要的是 iterator
),然后对每个值的 新副本 进行酸洗,并在每个工作进程中分别使用它们(注意:在 fork
而不是spawn worker,可能不涉及pickling,但是效果是一样的;原始数据在fork
时是"snapshotted",每个worker进程继承自己独立的数据快照,与其他进程中的数据没有剩余联系)。
如果您打算使用线程,请将导入更改为:
import multiprocessing.dummy as mp
将支持实现更改为基于线程的池,而不是基于进程的池。基于线程的池在单个共享内存中 space;不涉及 pickling/unpickling 或任何类型的进程间通信。缺点是 CPython 参考解释器上的并行性将受到 the GIL 的限制,更大的共享意味着需要更多的同步来防止竞争条件。
如果您想要流程,那将是一件非常痛苦的事情,因为您实际上不得不为 make it multiprocessing.Manager
compliant 的迭代器类型实现代理包装器,这将是一件非常痛苦的事情。