来自 joblib 的多处理不并行化?

Multiprocessing from joblib doesn't parallelize?

自从我从 python3.5 移动到 3.6 后,使用 joblib 的并行计算并没有减少计算时间。 以下是库安装的版本: - python:3.6.3 - 作业库:0.11 - 麻木的:1.14.0

基于一个众所周知的例子,我在下面给出了一个示例代码来重现该问题:

import time
import numpy as np
from joblib import Parallel, delayed

def square_int(i):
    return i * i

ndata = 1000000 
ti = time.time()
results = []    
for i in range(ndata):
    results.append(square_int(i))

duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )

for njobs in [1,2,3,4] :
    ti = time.time()  
    results = []
    results = Parallel(n_jobs=njobs, backend="multiprocessing")\
        (delayed(square_int)(i) for i in range(ndata))
    duration = np.round(time.time() - ti,4)
    print(f"{njobs} jobs computation: {duration} s" )

我得到以下输出:

虽然我将 ndata 的数量增加了 10 倍并删除了 1 个核心计算,但我得到了这些结果:

有人知道我应该调查哪个方向吗?

我认为主要原因是并行的开销超过了收益。换句话说,您的 square_int 太简单了,无法通过并行获得任何性能提升。 square_int 非常简单,以至于在进程之间传递输入和输出可能比执行函数 square_int 花费更多的时间。

我通过创建 square_int_batch 函数修改了您的代码。它大大减少了计算时间,尽管它仍然比串行实现要多。

import time
import numpy as np
from joblib import Parallel, delayed

def square_int(i):
    return i * i

def square_int_batch(a,b):
    results=[]
    for i in range(a,b):
        results.append(square_int(i))
    return results

ndata = 1000000 
ti = time.time()
results = []    
for i in range(ndata):
    results.append(square_int(i))

# results = [square_int(i) for i in range(ndata)]

duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )

batch_num = 3
batch_size=int(ndata/batch_num)

for njobs in [2,3,4] :
    ti = time.time()  
    results = []
    a = list(range(ndata))
#     results = Parallel(n_jobs=njobs, )(delayed(square_int)(i) for i in range(ndata))
#     results = Parallel(n_jobs=njobs, backend="multiprocessing")(delayed(
    results = Parallel(n_jobs=njobs)(delayed(
        square_int_batch)(i*batch_size,(i+1)*batch_size) for i in range(batch_num))
    duration = np.round(time.time() - ti,4)
    print(f"{njobs} jobs computation: {duration} s" )

计算时间为

standard computation: 0.3184 s
2 jobs computation: 0.5079 s
3 jobs computation: 0.6466 s
4 jobs computation: 0.4836 s

其他一些有助于缩短时间的建议。

  1. 在您的特定情况下使用列表推导results = [square_int(i) for i in range(ndata)] 替换for 循环,速度更快。我测试了。
  2. batch_num设置为合理的大小。这个值越大,开销越大。在我的例子中,当 batch_num 超过 1000 时,速度开始明显变慢。
  3. 我使用了默认后端 loky 而不是 multiprocessing。它稍微快一点,至少对我来说是这样。

从其他几个 SO 问题中,我读到多处理适用于 cpu-繁重的 任务,对此我没有官方定义。你可以自己探索。