如何加速应用下面的for循环和函数?

How to accelerate the application of the following for loop and function?

我有以下 for 循环:

for j in range(len(list_list_int)):
    arr_1_, arr_2_, arr_3_ = foo(bar, list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

我想申请 foo 多处理,主要是因为要花很多时间才能完成。我尝试使用 funcy's 块方法分批进行:

for j in chunks(1000, list_list_int):
    arr_1_, arr_2_, arr_3_ = foo(bar, list_of_ints[j])
    arr_1[j,:] = arr_1_.data.numpy()
    arr_2[j,:] = arr_2_.data.numpy()
    arr_3[j,:] = arr_3_.data.numpy()

但是,我得到 list object cannot be interpreted as an integer。使用多处理应用 foo 的正确方法是什么?

list_list_int = [1,2,3,4,5,6]
for j in chunks(2, list_list_int):
  for i in j:
    avg_, max_, last_ = foo(bar, i)

我没有安装 chunks,但我怀疑它生成的文档(对于大小为 2 的块,来自:

alist = [[1,2],[3,4],[5,6],[7,8]]                                     
j = [[1,2],[3,4]]
j = [[5,6],[7,8]]   

这会产生错误:

In [116]: alist[j]                                                              
TypeError: list indices must be integers or slices, not list

如果您的 foo 无法处理完整的列表列表,我看不出它如何处理分成块的列表。显然它一次只能处理一个子列表。

如果您想对 numpy 数组执行并行操作,那么我会使用 Dask

只需几行代码,您的操作就应该能够轻松运行在多个进程上,高度发达的 Dask 调度程序将为您平衡负载。与 joblib 等其他并行库相比,Dask 的一个巨大好处是它维护了原生的 numpy API.

import dask.array as da

# Setting up a random array with dimensions 10K rows and 10 columns
# This data is stored distributed across 10 chunks, and the columns are kept together (1_000, 10)
x = da.random.random((10_000, 10), chunks=(1_000, 10))
x = x.persist()  # Allow the entire array to persist in memory to speed up calculation


def foo(x):
    return x / 10


# Using the native numpy function, apply_along_axis, applying foo to each row in the matrix in parallel
result_foo = da.apply_along_axis(foo, 0, x)

# View original contents
x[0:10].compute()

# View sample of results
result_foo = result_foo.compute()
result_foo[0:10]