如何使用 dask.distributed 并行化嵌套循环?
How to parallelize a nested loop with dask.distributed?
我正在尝试使用看起来像这样的 dask distribute 并行化嵌套循环:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
如您所见,我正在使用 distributed
调度程序。首先,我创建了一个 computations
列表,其中包含一个惰性 delayed_b
函数,该函数将 list
中的一个元素作为参数。然后,delayed_b
创建一组新的 computations
调用 delayed_a
函数,所有内容都在分布式中执行。这个伪代码正在运行,但我发现如果 delayed_a
不存在,它会更快。那么我的问题是——执行分布式并行 for 循环的正确方法是什么?
在历史结束时我想做的是:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
对于使用 dask.distributed
完成嵌套循环的最佳方法的任何建议,我将不胜感激。
简单:
something = dask.delayed(do_something_with_e
list = [some thousands of elements here]
# this could be written as a one-line comprehension
computations = []
for element in list:
part = []
computations.append(part)
for e in element:
part.append(something(e))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
您应该永远不要调用延迟函数或compute()
在延迟函数中调用。
(请注意,默认情况下会使用分布式调度程序,只要您已创建客户端)
我正在尝试使用看起来像这样的 dask distribute 并行化嵌套循环:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
如您所见,我正在使用 distributed
调度程序。首先,我创建了一个 computations
列表,其中包含一个惰性 delayed_b
函数,该函数将 list
中的一个元素作为参数。然后,delayed_b
创建一组新的 computations
调用 delayed_a
函数,所有内容都在分布式中执行。这个伪代码正在运行,但我发现如果 delayed_a
不存在,它会更快。那么我的问题是——执行分布式并行 for 循环的正确方法是什么?
在历史结束时我想做的是:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
对于使用 dask.distributed
完成嵌套循环的最佳方法的任何建议,我将不胜感激。
简单:
something = dask.delayed(do_something_with_e
list = [some thousands of elements here]
# this could be written as a one-line comprehension
computations = []
for element in list:
part = []
computations.append(part)
for e in element:
part.append(something(e))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
您应该永远不要调用延迟函数或compute()
在延迟函数中调用。
(请注意,默认情况下会使用分布式调度程序,只要您已创建客户端)