如何使用 dask.distributed 并行化嵌套循环?

How to parallelize a nested loop with dask.distributed?

我正在尝试使用看起来像这样的 dask distribute 并行化嵌套循环:

@dask.delayed
def delayed_a(e):
    a = do_something_with(e)
    return something

@dask.delayed
def delayed_b(element):
    computations = []
    for e in element:
        computations.add(delayed_a(e))

    b = dask.compute(*computations, scheduler='distributed',
                    num_workers=4)
    return b

list = [some thousands of elements here]
computations = []
for element in list:
    computations.append(delayed_b(element))
    results = dask.compute(*computations, scheduler='distributed',
                           num_workers=4)

如您所见,我正在使用 distributed 调度程序。首先,我创建了一个 computations 列表,其中包含一个惰性 delayed_b 函数,该函数将 list 中的一个元素作为参数。然后,delayed_b 创建一组新的 computations 调用 delayed_a 函数,所有内容都在分布式中执行。这个伪代码正在运行,但我发现如果 delayed_a 不存在,它会更快。那么我的问题是——执行分布式并行 for 循环的正确方法是什么?

在历史结束时我想做的是:

list = [some thousands of elements here]
for element in list:
    for e in element:
        do_something_with(e)

对于使用 dask.distributed 完成嵌套循环的最佳方法的任何建议,我将不胜感激。

简单:

something = dask.delayed(do_something_with_e
list = [some thousands of elements here]

# this could be written as a one-line comprehension
computations = []
for element in list:
    part = []
    computations.append(part)
    for e in element:
        part.append(something(e))

results = dask.compute(*computations, scheduler='distributed',
                       num_workers=4)

您应该永远不要调用延迟函数或compute()在延迟函数中调用。

(请注意,默认情况下会使用分布式调度程序,只要您已创建客户端)