Dask:构建和执行高效的管道

Dask: build and execute efficient pipeline

我正在使用 dask 对数千张图像应用多种转换。 对于每个图像,需要按顺序进行 5 次转换。 我想在 HPC 集群上分发此管道。 我有 200 个可用 CPU 所以我希望能够执行

input_files = [a list of 2000 files]
futures = client.map(transforms, input_files)

Dask 分布式理想情况下一次 运行 200 次转换。处理完成后立即写入所需的输出等。

然而,它似乎并不是这样工作的。 我观察到 dask 倾向于启动 200 个任务,但只启动前 3 个步骤,然后例如50 个任务或等效的随机处理。

使用 dask 分布式的好方法是什么? 例如,我应该执行 client.map 10 次(每次 200 个任务)吗? 有没有办法在启动新的 dask 之前“强制”dask 从头到尾执行一段代码?

What would be the good way of using dask distributed? Should I for example execute client.map 10 times (200 tasks each time)? Is there a way to "force" dask to execute a code from start to end before launching a new dask ?

一种选择是为较早的任务指定较高的优先级。 client.map 分配单个优先级值,因此要为每个任务指定特定优先级,请使用 client.submit:

futures = []
for n,f in enumerate(input_files):
    futures.append(client.submit, transforms, f, priority=-n)

在这种情况下,后面的任务将具有较低的优先级,因此应在较高优先级的任务完成后完成。由于您有多个转换步骤,因此您还需要为后面的转换函数分配更高的优先级值。