如何有效地使用多处理来加速大量的小任务?
how to efficiently use multiprocessing to speed up huge amount of tiny tasks?
我在 Python multiprocessing.Pool
中遇到了一些麻烦。我有两个 numpy
数组列表 a
和 b
,其中
a.shape=(10000,3)
和
b.shape=(1000000000,3)
然后我有一个函数可以像
那样做一些计算
def role(array, point):
sub = array-point
return (1/(np.sqrt(np.min(np.sum(sub*sub, axis=-1)))+0.001)**2)
接下来,我需要计算
[role(a, point) for point in b]
为了加快速度,我尝试使用
cpu_num = 4
m = multiprocessing.Pool(cpu_num)
cost_list = m.starmap(role, [(a, point) for point in b])
m.close
整个过程大约需要70秒,但如果我设置cpu_num = 1
,处理时间会减少到60秒...我的笔记本电脑是6核的,供参考。
这里我有两个问题:
- 我在
multiprocessing.Pool
上做错了什么吗?为什么我设置 cpu_num = 4
会增加处理时间?
- 对于这样的任务(每个for循环都是一个非常小的过程),我应该使用多处理来加速吗?我觉得每次 python 填写
Pool
比处理函数 role
... 花费的时间更长
非常欢迎任何建议。
如果任务太小,那么多处理开销将成为您的瓶颈,您将一事无成。
如果每个任务的数据量您必须传递给工作人员或工作人员必须return那么您也不会赢得很多(甚至一无所获)
如果您有 10,000 个小任务,那么我建议创建一个元任务列表。
每个元任务将包括执行例如 20 个小任务。
meta_tasks = []
for idx in range(0, len(tiny_tasks), 20):
meta_tasks.append(tiny_tasks[idx:idx+20])
然后将元任务传递给您的工作池。
多处理会带来一些开销(创建新进程),这就是为什么当你有很多小任务时它不是一个很好的选择,在这些任务中创建进程的开销可能超过并行化的好处。
您是否考虑过矢量化您的问题?
特别是,如果你广播变量 b
你会到达那里:
sub = a - b[::,np.newaxis] # broadcast b
1./(np.sqrt(np.min(np.sum(sub**2, axis=2), axis=-1))+0.001)**2
我相信您仍然可以稍微降低最后一个表达式的复杂性,因为您正在创建平方根的平方,这似乎是多余的(请注意,我假设 0.001 常数值只是存在以避免一些不明智的操作,如除以零)。
我在 Python multiprocessing.Pool
中遇到了一些麻烦。我有两个 numpy
数组列表 a
和 b
,其中
a.shape=(10000,3)
和
b.shape=(1000000000,3)
然后我有一个函数可以像
那样做一些计算def role(array, point):
sub = array-point
return (1/(np.sqrt(np.min(np.sum(sub*sub, axis=-1)))+0.001)**2)
接下来,我需要计算
[role(a, point) for point in b]
为了加快速度,我尝试使用
cpu_num = 4
m = multiprocessing.Pool(cpu_num)
cost_list = m.starmap(role, [(a, point) for point in b])
m.close
整个过程大约需要70秒,但如果我设置cpu_num = 1
,处理时间会减少到60秒...我的笔记本电脑是6核的,供参考。
这里我有两个问题:
- 我在
multiprocessing.Pool
上做错了什么吗?为什么我设置cpu_num = 4
会增加处理时间? - 对于这样的任务(每个for循环都是一个非常小的过程),我应该使用多处理来加速吗?我觉得每次 python 填写
Pool
比处理函数role
... 花费的时间更长
非常欢迎任何建议。
如果任务太小,那么多处理开销将成为您的瓶颈,您将一事无成。
如果每个任务的数据量您必须传递给工作人员或工作人员必须return那么您也不会赢得很多(甚至一无所获)
如果您有 10,000 个小任务,那么我建议创建一个元任务列表。 每个元任务将包括执行例如 20 个小任务。
meta_tasks = []
for idx in range(0, len(tiny_tasks), 20):
meta_tasks.append(tiny_tasks[idx:idx+20])
然后将元任务传递给您的工作池。
多处理会带来一些开销(创建新进程),这就是为什么当你有很多小任务时它不是一个很好的选择,在这些任务中创建进程的开销可能超过并行化的好处。
您是否考虑过矢量化您的问题?
特别是,如果你广播变量 b
你会到达那里:
sub = a - b[::,np.newaxis] # broadcast b
1./(np.sqrt(np.min(np.sum(sub**2, axis=2), axis=-1))+0.001)**2
我相信您仍然可以稍微降低最后一个表达式的复杂性,因为您正在创建平方根的平方,这似乎是多余的(请注意,我假设 0.001 常数值只是存在以避免一些不明智的操作,如除以零)。