来自 joblib 的多处理不并行化?
Multiprocessing from joblib doesn't parallelize?
自从我从 python3.5 移动到 3.6 后,使用 joblib 的并行计算并没有减少计算时间。
以下是库安装的版本:
- python:3.6.3
- 作业库:0.11
- 麻木的:1.14.0
基于一个众所周知的例子,我在下面给出了一个示例代码来重现该问题:
import time
import numpy as np
from joblib import Parallel, delayed
def square_int(i):
return i * i
ndata = 1000000
ti = time.time()
results = []
for i in range(ndata):
results.append(square_int(i))
duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )
for njobs in [1,2,3,4] :
ti = time.time()
results = []
results = Parallel(n_jobs=njobs, backend="multiprocessing")\
(delayed(square_int)(i) for i in range(ndata))
duration = np.round(time.time() - ti,4)
print(f"{njobs} jobs computation: {duration} s" )
我得到以下输出:
- 标准计算:0.2672 s
- 1 个作业计算:352.3113 秒
- 2 个作业计算:6.9662 秒
- 3 个作业计算:7.2556 秒
- 4 个作业计算:7.097 秒
虽然我将 ndata 的数量增加了 10 倍并删除了 1 个核心计算,但我得到了这些结果:
- 标准计算:2.4739 秒
- 2 个作业计算:77.8861 秒
- 3 个作业计算:79.9909 秒
- 4 个作业计算:83.1523 秒
有人知道我应该调查哪个方向吗?
我认为主要原因是并行的开销超过了收益。换句话说,您的 square_int
太简单了,无法通过并行获得任何性能提升。 square_int
非常简单,以至于在进程之间传递输入和输出可能比执行函数 square_int
花费更多的时间。
我通过创建 square_int_batch
函数修改了您的代码。它大大减少了计算时间,尽管它仍然比串行实现要多。
import time
import numpy as np
from joblib import Parallel, delayed
def square_int(i):
return i * i
def square_int_batch(a,b):
results=[]
for i in range(a,b):
results.append(square_int(i))
return results
ndata = 1000000
ti = time.time()
results = []
for i in range(ndata):
results.append(square_int(i))
# results = [square_int(i) for i in range(ndata)]
duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )
batch_num = 3
batch_size=int(ndata/batch_num)
for njobs in [2,3,4] :
ti = time.time()
results = []
a = list(range(ndata))
# results = Parallel(n_jobs=njobs, )(delayed(square_int)(i) for i in range(ndata))
# results = Parallel(n_jobs=njobs, backend="multiprocessing")(delayed(
results = Parallel(n_jobs=njobs)(delayed(
square_int_batch)(i*batch_size,(i+1)*batch_size) for i in range(batch_num))
duration = np.round(time.time() - ti,4)
print(f"{njobs} jobs computation: {duration} s" )
计算时间为
standard computation: 0.3184 s
2 jobs computation: 0.5079 s
3 jobs computation: 0.6466 s
4 jobs computation: 0.4836 s
其他一些有助于缩短时间的建议。
- 在您的特定情况下使用列表推导
results = [square_int(i) for i in range(ndata)]
替换for 循环,速度更快。我测试了。
- 将
batch_num
设置为合理的大小。这个值越大,开销越大。在我的例子中,当 batch_num
超过 1000 时,速度开始明显变慢。
- 我使用了默认后端
loky
而不是 multiprocessing
。它稍微快一点,至少对我来说是这样。
从其他几个 SO 问题中,我读到多处理适用于 cpu-繁重的 任务,对此我没有官方定义。你可以自己探索。
自从我从 python3.5 移动到 3.6 后,使用 joblib 的并行计算并没有减少计算时间。 以下是库安装的版本: - python:3.6.3 - 作业库:0.11 - 麻木的:1.14.0
基于一个众所周知的例子,我在下面给出了一个示例代码来重现该问题:
import time
import numpy as np
from joblib import Parallel, delayed
def square_int(i):
return i * i
ndata = 1000000
ti = time.time()
results = []
for i in range(ndata):
results.append(square_int(i))
duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )
for njobs in [1,2,3,4] :
ti = time.time()
results = []
results = Parallel(n_jobs=njobs, backend="multiprocessing")\
(delayed(square_int)(i) for i in range(ndata))
duration = np.round(time.time() - ti,4)
print(f"{njobs} jobs computation: {duration} s" )
我得到以下输出:
- 标准计算:0.2672 s
- 1 个作业计算:352.3113 秒
- 2 个作业计算:6.9662 秒
- 3 个作业计算:7.2556 秒
- 4 个作业计算:7.097 秒
虽然我将 ndata 的数量增加了 10 倍并删除了 1 个核心计算,但我得到了这些结果:
- 标准计算:2.4739 秒
- 2 个作业计算:77.8861 秒
- 3 个作业计算:79.9909 秒
- 4 个作业计算:83.1523 秒
有人知道我应该调查哪个方向吗?
我认为主要原因是并行的开销超过了收益。换句话说,您的 square_int
太简单了,无法通过并行获得任何性能提升。 square_int
非常简单,以至于在进程之间传递输入和输出可能比执行函数 square_int
花费更多的时间。
我通过创建 square_int_batch
函数修改了您的代码。它大大减少了计算时间,尽管它仍然比串行实现要多。
import time
import numpy as np
from joblib import Parallel, delayed
def square_int(i):
return i * i
def square_int_batch(a,b):
results=[]
for i in range(a,b):
results.append(square_int(i))
return results
ndata = 1000000
ti = time.time()
results = []
for i in range(ndata):
results.append(square_int(i))
# results = [square_int(i) for i in range(ndata)]
duration = np.round(time.time() - ti,4)
print(f"standard computation: {duration} s" )
batch_num = 3
batch_size=int(ndata/batch_num)
for njobs in [2,3,4] :
ti = time.time()
results = []
a = list(range(ndata))
# results = Parallel(n_jobs=njobs, )(delayed(square_int)(i) for i in range(ndata))
# results = Parallel(n_jobs=njobs, backend="multiprocessing")(delayed(
results = Parallel(n_jobs=njobs)(delayed(
square_int_batch)(i*batch_size,(i+1)*batch_size) for i in range(batch_num))
duration = np.round(time.time() - ti,4)
print(f"{njobs} jobs computation: {duration} s" )
计算时间为
standard computation: 0.3184 s
2 jobs computation: 0.5079 s
3 jobs computation: 0.6466 s
4 jobs computation: 0.4836 s
其他一些有助于缩短时间的建议。
- 在您的特定情况下使用列表推导
results = [square_int(i) for i in range(ndata)]
替换for 循环,速度更快。我测试了。 - 将
batch_num
设置为合理的大小。这个值越大,开销越大。在我的例子中,当batch_num
超过 1000 时,速度开始明显变慢。 - 我使用了默认后端
loky
而不是multiprocessing
。它稍微快一点,至少对我来说是这样。
从其他几个 SO 问题中,我读到多处理适用于 cpu-繁重的 任务,对此我没有官方定义。你可以自己探索。