运行 在 numpy 块上并行重循环
Run in parallel a heavy loop on numpy chunks
我需要遍历一个巨大的 numpy 数组来构建三个列表,这取决于昂贵的 C 库调用的结果,它接受标量值并且不能被矢量化(或者至少我不知道如何去做) ).
这个循环可能需要几个小时到几天,我可以看到性能随着时间的推移而下降(我记录了进度,我可以看到到最后速度要慢得多)可能是由于列表的大小不断增加(??)。
代码如下(我省略了与打印进度和一些微优化相关的代码):
import numpy as np
import swig_c_lib
def build_indexes(large_numpy_array_1, large_numpy_array_2):
xs = []
ys = []
idxs = []
for (x, y), value in np.ndenumerate(large_numpy_array_1):
if not (value <= -1.0e+10):
try:
index = swig_c_lib.slow_computation(np.asscalar(large_numpy_array_2[x, y]), np.asscalar(large_numpy_array_1[x, y]))
except swig_lib.InternalError:
pass
else:
xs.append(x)
ys.append(y)
idxs.append(index)
return np.asarray(xs), np.asarray(ys), np.asarray(idxs)
可能,一种解决方案是将大型输入 numpy 数组拆分为 4 个子数组并使用多处理(但我不确定如何合并结果)。有人可以在这里提供帮助吗?
这是 dask
模块可以提供帮助的问题
让我们从创建两个数组 a1
和 a2
开始。它们可以具有任意形状,但在本例中,我们将通过 n
使它们 n
,其中 n=30
。我们将数组展平并将它们堆叠在一起,形成一个形状为 (2,900) 的大数组。 axis=1
维度上的每一对都是 a1
和 a2
上相同位置的一对项目:
In[1]:
import numpy as np
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
a = np.stack((a1.flat, a2.flat))
a.shape
Out[1]:
(2, 900)
然后我们继续将数组分成块。我们选择 250 个块:
In[2]:
chunks = np.array_split(a, 250, axis=1)
len(chunks)
Out[2]:
250
In[3]:
chunks[0]
Out[3]:
array([[ 0.54631022, 0.8428954 , 0.11835531, 0.59720379],
[ 0.51184696, 0.64365038, 0.74471553, 0.67035977]])
我们现在将定义一个slow_function
,它将扮演问题中描述的慢速计算的角色。我们还定义了一种使用 numpy
对其中一个块应用慢速函数的方法。
In[4]:
def slow_function(pair):
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
apply_on_chunk(chunks[0])
Out[4]:
array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356])
在上面,请注意 apply_on_chunk
与块中 axis=1
的大小无关。换句话说,我们也可以继续调用 apply_on_chunk(a)
来计算整个初始数组的结果。
与 dask.bag
并行
我们现在展示如何使用 dask.bag
对象的 map
方法沿着块并行计算:
In[5]:
import dask.bag as db
mybag = db.from_sequence(chunks)
In[6]:
%time myresult = mybag.map(apply_on_chunk)
Out[6]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.62 ms
此时我们还没有计算出任何东西。我们已经向 dask
描述了我们希望如何计算结果。此步骤发生得相对较快,大约 1.6 毫秒
为了继续并触发实际计算,我们在 myresult
:
上调用 compute
方法
In[7]:
%time myresult = myresult.compute()
Out[7]:
CPU times: user 256 ms, sys: 24 ms, total: 280 ms
Wall time: 362 ms
上面的 运行 花费了 1/3 多一点的时间。我们得到的是一个数组列表,对应于 dask.bag
中每个元素调用 apply_on_chunk
的结果。我们检查其中的前五个:
In[8]:
myresult[:5]
Out[8]:
[array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356]),
array([ 1.48913909, 1.25028145, 1.36707112, 1.04826167]),
array([ 0.90069768, 1.24921559, 1.23146726, 0.84963409]),
array([ 0.72292347, 0.87069598, 1.35893143, 1.02451637]),
array([ 1.16422966, 1.35559156, 0.9071381 , 1.17786002])]
如果我们真的想要最终形式的结果,我们必须调用 np.concatenate
将所有块的结果放在一起。我们在下面这样做,并展示计算的性能:
In[9]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[9]:
CPU times: user 232 ms, sys: 40 ms, total: 272 ms
Wall time: 342 ms
完整的计算,它给我们一个单一的结果数组,需要比 运行 多一点的 1/3 秒。
如果我们直接对整个数组进行计算,而不使用任何并行化,会怎样:
In[10]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[10]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12.9 ms
直接计算快很多。但这是因为 slow_function
目前并没有那么慢。它只是两个元素的总和,根本不需要太多时间。我们在 dask.bag
计算中看到的缓慢是并行化产生的开销。
让我们继续再试一次,但这次使用的是一个非常慢的函数,每次调用大约需要 20 毫秒:
In[11]:
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
import time
def slow_function(pair):
time.sleep(0.02)
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
让我们比较一下 dask
和 运行 直接对整个数组进行计算:
In[12]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[12]:
CPU times: user 236 ms, sys: 20 ms, total: 256 ms
Wall time: 4.75 s
In[13]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[13]:
CPU times: user 72 ms, sys: 16 ms, total: 88 ms
Wall time: 18.2 s
可以看出,dask
正在利用多处理来加速计算。我们得到大约 4 倍的加速。
为了完整起见,我们证明 dask
的结果与直接计算的结果是一致的:
In[14]:
np.testing.assert_array_equal(myresult, myresult_)
print("success")
Out[14]:
success
注意问题中的函数returns一个元组
np.asarray(xs), np.asarray(ys), np.asarray(idxs)
我们所描述的只是np.asarray(idxs)
的计算。如果知道原始 a1
和 a2
.
的形状,则可以轻松获得返回元组中的前两项。
我需要遍历一个巨大的 numpy 数组来构建三个列表,这取决于昂贵的 C 库调用的结果,它接受标量值并且不能被矢量化(或者至少我不知道如何去做) ). 这个循环可能需要几个小时到几天,我可以看到性能随着时间的推移而下降(我记录了进度,我可以看到到最后速度要慢得多)可能是由于列表的大小不断增加(??)。
代码如下(我省略了与打印进度和一些微优化相关的代码):
import numpy as np
import swig_c_lib
def build_indexes(large_numpy_array_1, large_numpy_array_2):
xs = []
ys = []
idxs = []
for (x, y), value in np.ndenumerate(large_numpy_array_1):
if not (value <= -1.0e+10):
try:
index = swig_c_lib.slow_computation(np.asscalar(large_numpy_array_2[x, y]), np.asscalar(large_numpy_array_1[x, y]))
except swig_lib.InternalError:
pass
else:
xs.append(x)
ys.append(y)
idxs.append(index)
return np.asarray(xs), np.asarray(ys), np.asarray(idxs)
可能,一种解决方案是将大型输入 numpy 数组拆分为 4 个子数组并使用多处理(但我不确定如何合并结果)。有人可以在这里提供帮助吗?
这是 dask
模块可以提供帮助的问题
让我们从创建两个数组 a1
和 a2
开始。它们可以具有任意形状,但在本例中,我们将通过 n
使它们 n
,其中 n=30
。我们将数组展平并将它们堆叠在一起,形成一个形状为 (2,900) 的大数组。 axis=1
维度上的每一对都是 a1
和 a2
上相同位置的一对项目:
In[1]:
import numpy as np
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
a = np.stack((a1.flat, a2.flat))
a.shape
Out[1]:
(2, 900)
然后我们继续将数组分成块。我们选择 250 个块:
In[2]:
chunks = np.array_split(a, 250, axis=1)
len(chunks)
Out[2]:
250
In[3]:
chunks[0]
Out[3]:
array([[ 0.54631022, 0.8428954 , 0.11835531, 0.59720379],
[ 0.51184696, 0.64365038, 0.74471553, 0.67035977]])
我们现在将定义一个slow_function
,它将扮演问题中描述的慢速计算的角色。我们还定义了一种使用 numpy
对其中一个块应用慢速函数的方法。
In[4]:
def slow_function(pair):
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
apply_on_chunk(chunks[0])
Out[4]:
array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356])
在上面,请注意 apply_on_chunk
与块中 axis=1
的大小无关。换句话说,我们也可以继续调用 apply_on_chunk(a)
来计算整个初始数组的结果。
与 dask.bag
并行
我们现在展示如何使用 dask.bag
对象的 map
方法沿着块并行计算:
In[5]:
import dask.bag as db
mybag = db.from_sequence(chunks)
In[6]:
%time myresult = mybag.map(apply_on_chunk)
Out[6]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.62 ms
此时我们还没有计算出任何东西。我们已经向 dask
描述了我们希望如何计算结果。此步骤发生得相对较快,大约 1.6 毫秒
为了继续并触发实际计算,我们在 myresult
:
compute
方法
In[7]:
%time myresult = myresult.compute()
Out[7]:
CPU times: user 256 ms, sys: 24 ms, total: 280 ms
Wall time: 362 ms
上面的 运行 花费了 1/3 多一点的时间。我们得到的是一个数组列表,对应于 dask.bag
中每个元素调用 apply_on_chunk
的结果。我们检查其中的前五个:
In[8]:
myresult[:5]
Out[8]:
[array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356]),
array([ 1.48913909, 1.25028145, 1.36707112, 1.04826167]),
array([ 0.90069768, 1.24921559, 1.23146726, 0.84963409]),
array([ 0.72292347, 0.87069598, 1.35893143, 1.02451637]),
array([ 1.16422966, 1.35559156, 0.9071381 , 1.17786002])]
如果我们真的想要最终形式的结果,我们必须调用 np.concatenate
将所有块的结果放在一起。我们在下面这样做,并展示计算的性能:
In[9]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[9]:
CPU times: user 232 ms, sys: 40 ms, total: 272 ms
Wall time: 342 ms
完整的计算,它给我们一个单一的结果数组,需要比 运行 多一点的 1/3 秒。
如果我们直接对整个数组进行计算,而不使用任何并行化,会怎样:
In[10]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[10]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12.9 ms
直接计算快很多。但这是因为 slow_function
目前并没有那么慢。它只是两个元素的总和,根本不需要太多时间。我们在 dask.bag
计算中看到的缓慢是并行化产生的开销。
让我们继续再试一次,但这次使用的是一个非常慢的函数,每次调用大约需要 20 毫秒:
In[11]:
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
import time
def slow_function(pair):
time.sleep(0.02)
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
让我们比较一下 dask
和 运行 直接对整个数组进行计算:
In[12]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[12]:
CPU times: user 236 ms, sys: 20 ms, total: 256 ms
Wall time: 4.75 s
In[13]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[13]:
CPU times: user 72 ms, sys: 16 ms, total: 88 ms
Wall time: 18.2 s
可以看出,dask
正在利用多处理来加速计算。我们得到大约 4 倍的加速。
为了完整起见,我们证明 dask
的结果与直接计算的结果是一致的:
In[14]:
np.testing.assert_array_equal(myresult, myresult_)
print("success")
Out[14]:
success
注意问题中的函数returns一个元组
np.asarray(xs), np.asarray(ys), np.asarray(idxs)
我们所描述的只是np.asarray(idxs)
的计算。如果知道原始 a1
和 a2
.