使用多处理创建一个集合
Create a set with multiprocessing
我有一大堆项目,还有一些辅助数据。对于列表中的每个项目和数据中的元素,我计算一些东西,并将所有东西添加到输出集中(可能有很多重复项)。在代码中:
def process_list(myList, data):
ret = set()
for item in myList:
for foo in data:
thing = compute(item, foo)
ret.add(thing)
return ret
if __name__ == "__main__":
data = create_data()
myList = create_list()
what_I_Want = process_list(myList, data)
因为 myList 很大并且 compute(item, foo) 很昂贵,所以我需要使用 multiprocessing。现在这就是我所拥有的:
from multiprocessing import Pool
initialize_worker(bar):
global data
data = bar
def process_item(item):
ret = set()
for foo in data:
thing = compute(item, foo)
ret.add(thing)
return ret
if __name__ == "__main__":
data = create_data()
myList = create_list()
p = Pool(nb_proc, initializer = initialize_worker, initiargs = (data))
ret = p.map(process_item, myList)
what_I_Want = set().union(*ret)
我不喜欢的是 ret 可能很大。我正在考虑 3 个选项:
1) 将 myList 分成块并将它们传递给工作人员,他们将在每个块上使用 process_list(因此在该步骤将删除一些重复项),然后联合所有获得的集合以删除最后一个副本。
问题:有什么优雅的方法吗?我们可以指定 Pool.map 它应该将块传递给工作人员而不是块中的每个项目吗?我知道我可以自己砍名单,但这太丑了。
2) 在所有进程之间有一个共享集。
问题:为什么multiprocessing.manager没有feature set()? (我知道它有 dict(),但仍然..)如果我使用 manager.dict(),进程和管理器之间的通信不会显着减慢速度吗?
3) 共享 multiprocessing.Queue()。每个工作人员将其计算的东西放入队列中。另一个工作人员进行合并,直到找到一些停止项(我们将其放入 p.map 之后的队列)
问题:这是一个愚蠢的想法吗?进程与 multiprocessing.Queue 之间的通信是否比那些与 manager.dict() 之间的通信更快?另外,我怎样才能取回执行联合的工人计算的集合?
一件小事:initiargs
需要一个元组。
如果您想避免在将结果缩减为 set
之前创建所有结果,您可以使用具有一定块大小的 Pool.imap_unordered()
。这将在每个工作人员可用时生成块大小结果。
如果你想改变process_item
直接接受块,你必须手动完成。 toolz.partition_all
可用于对初始数据集进行分区。
最后,托管数据结构必然具有更高的同步开销。我会尽量避开它们。
选择 imap_unordered
看看是否足够好;如果不是,则分区;如果你忍不住有超过几个重复的总数,请使用托管字典。
我有一大堆项目,还有一些辅助数据。对于列表中的每个项目和数据中的元素,我计算一些东西,并将所有东西添加到输出集中(可能有很多重复项)。在代码中:
def process_list(myList, data):
ret = set()
for item in myList:
for foo in data:
thing = compute(item, foo)
ret.add(thing)
return ret
if __name__ == "__main__":
data = create_data()
myList = create_list()
what_I_Want = process_list(myList, data)
因为 myList 很大并且 compute(item, foo) 很昂贵,所以我需要使用 multiprocessing。现在这就是我所拥有的:
from multiprocessing import Pool
initialize_worker(bar):
global data
data = bar
def process_item(item):
ret = set()
for foo in data:
thing = compute(item, foo)
ret.add(thing)
return ret
if __name__ == "__main__":
data = create_data()
myList = create_list()
p = Pool(nb_proc, initializer = initialize_worker, initiargs = (data))
ret = p.map(process_item, myList)
what_I_Want = set().union(*ret)
我不喜欢的是 ret 可能很大。我正在考虑 3 个选项:
1) 将 myList 分成块并将它们传递给工作人员,他们将在每个块上使用 process_list(因此在该步骤将删除一些重复项),然后联合所有获得的集合以删除最后一个副本。
问题:有什么优雅的方法吗?我们可以指定 Pool.map 它应该将块传递给工作人员而不是块中的每个项目吗?我知道我可以自己砍名单,但这太丑了。
2) 在所有进程之间有一个共享集。
问题:为什么multiprocessing.manager没有feature set()? (我知道它有 dict(),但仍然..)如果我使用 manager.dict(),进程和管理器之间的通信不会显着减慢速度吗?
3) 共享 multiprocessing.Queue()。每个工作人员将其计算的东西放入队列中。另一个工作人员进行合并,直到找到一些停止项(我们将其放入 p.map 之后的队列)
问题:这是一个愚蠢的想法吗?进程与 multiprocessing.Queue 之间的通信是否比那些与 manager.dict() 之间的通信更快?另外,我怎样才能取回执行联合的工人计算的集合?
一件小事:initiargs
需要一个元组。
如果您想避免在将结果缩减为 set
之前创建所有结果,您可以使用具有一定块大小的 Pool.imap_unordered()
。这将在每个工作人员可用时生成块大小结果。
如果你想改变process_item
直接接受块,你必须手动完成。 toolz.partition_all
可用于对初始数据集进行分区。
最后,托管数据结构必然具有更高的同步开销。我会尽量避开它们。
选择 imap_unordered
看看是否足够好;如果不是,则分区;如果你忍不住有超过几个重复的总数,请使用托管字典。