如何有效地使用多处理来加速大量的小任务?

how to efficiently use multiprocessing to speed up huge amount of tiny tasks?

我在 Python multiprocessing.Pool 中遇到了一些麻烦。我有两个 numpy 数组列表 ab,其中

a.shape=(10000,3)b.shape=(1000000000,3)

然后我有一个函数可以像

那样做一些计算
def role(array, point):
    sub = array-point
    return (1/(np.sqrt(np.min(np.sum(sub*sub, axis=-1)))+0.001)**2)

接下来,我需要计算

[role(a, point) for point in b]

为了加快速度,我尝试使用

    cpu_num = 4
    m = multiprocessing.Pool(cpu_num)
    cost_list = m.starmap(role, [(a, point) for point in b])
    m.close

整个过程大约需要70秒,但如果我设置cpu_num = 1,处理时间会减少到60秒...我的笔记本电脑是6核的,供参考。

这里我有两个问题:

  1. 我在 multiprocessing.Pool 上做错了什么吗?为什么我设置 cpu_num = 4 会增加处理时间?
  2. 对于这样的任务(每个for循环都是一个非常小的过程),我应该使用多处理来加速吗?我觉得每次 python 填写 Pool 比处理函数 role...
  3. 花费的时间更长

非常欢迎任何建议。

如果任务太小,那么多处理开销将成为您的瓶颈,您将一事无成。

如果每个任务的数据量您必须传递给工作人员或工作人员必须return那么您也不会赢得很多(甚至一无所获)

如果您有 10,000 个小任务,那么我建议创建一个元任务列表。 每个元任务将包括执行例如 20 个小任务。

meta_tasks = []
for idx in range(0, len(tiny_tasks), 20):
    meta_tasks.append(tiny_tasks[idx:idx+20])

然后将元任务传递给您的工作池。

多处理会带来一些开销(创建新进程),这就是为什么当你有很多小任务时它不是一个很好的选择,在这些任务中创建进程的开销可能超过并行化的好处。

您是否考虑过矢量化您的问题? 特别是,如果你广播变量 b 你会到达那里:

sub = a - b[::,np.newaxis]  # broadcast b
1./(np.sqrt(np.min(np.sum(sub**2, axis=2), axis=-1))+0.001)**2

我相信您仍然可以稍微降低最后一个表达式的复杂性,因为您正在创建平方根的平方,这似乎是多余的(请注意,我假设 0.001 常数值只是存在以避免一些不明智的操作,如除以零)。