使用多处理创建一个集合

Create a set with multiprocessing

我有一大堆项目,还有一些辅助数据。对于列表中的每个项目和数据中的元素,我计算一些东西,并将所有东西添加到输出集中(可能有很多重复项)。在代码中:

def process_list(myList, data):
    ret = set()
    for item in myList:
        for foo in data:
            thing = compute(item, foo)
            ret.add(thing)
    return ret

if __name__ == "__main__":
    data = create_data()
    myList = create_list()
    what_I_Want = process_list(myList, data)

因为 myList 很大并且 compute(item, foo) 很昂贵,所以我需要使用 multiprocessing。现在这就是我所拥有的:

from multiprocessing import Pool

initialize_worker(bar):
    global data
    data = bar

def process_item(item):
    ret = set()
    for foo in data:
        thing = compute(item, foo)
        ret.add(thing)
    return ret

if __name__ == "__main__":
    data = create_data()
    myList = create_list()
    p = Pool(nb_proc, initializer = initialize_worker, initiargs = (data))
    ret = p.map(process_item, myList)
    what_I_Want = set().union(*ret)

我不喜欢的是 ret 可能很大。我正在考虑 3 个选项:

1) 将 myList 分成块并将它们传递给工作人员,他们将在每个块上使用 process_list(因此在该步骤将删除一些重复项),然后联合所有获得的集合以删除最后一个副本。

问题:有什么优雅的方法吗?我们可以指定 Pool.map 它应该将块传递给工作人员而不是块中的每个项目吗?我知道我可以自己砍名单,但这太丑了。

2) 在所有进程之间有一个共享集。

问题:为什么multiprocessing.manager没有feature set()? (我知道它有 dict(),但仍然..)如果我使用 manager.dict(),进程和管理器之间的通信不会显着减慢速度吗?

3) 共享 multiprocessing.Queue()。每个工作人员将其计算的东西放入队列中。另一个工作人员进行合并,直到找到一些停止项(我们将其放入 p.map 之后的队列)

问题:这是一个愚蠢的想法吗?进程与 multiprocessing.Queue 之间的通信是否比那些与 manager.dict() 之间的通信更快?另外,我怎样才能取回执行联合的工人计算的集合?

一件小事:initiargs需要一个元组。

如果您想避免在将结果缩减为 set 之前创建所有结果,您可以使用具有一定块大小的 Pool.imap_unordered()。这将在每个工作人员可用时生成块大小结果。

如果你想改变process_item直接接受块,你必须手动完成。 toolz.partition_all 可用于对初始数据集进行分区。

最后,托管数据结构必然具有更高的同步开销。我会尽量避开它们。

选择 imap_unordered 看看是否足够好;如果不是,则分区;如果你忍不住有超过几个重复的总数,请使用托管字典。