为什么 dask worker 由于 "small" 大小任务的 MemoryError 而失败? [Dask.bag]

Why does dask worker fails due to MemoryError on "small" size task? [Dask.bag]

我是 运行 多张图片的管道。管道包括从文件系统读取图像,对每个图像进行处理,然后将图像保存到文件系统。然而,dask worker 由于 MemoryError 而失败。 有没有办法确保 dask workers 不会在内存中加载太多图像?即等到工作人员有足够的space,然后在新图像上开始处理管道。

我有一个调度程序和 40 个 worker,4 个内核,15GB 内存和 运行 Centos7。我正在尝试批量处理 125 张图像;每个图像都相当大,但足够小以适合工人;整个过程大约需要 3GB。

我尝试处理少量图像,但效果很好。

已编辑

from dask.distributed import Client, LocalCluster

# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))

paths = ['list', 'of', 'paths']

# Read the file data from each path
data = client.map(read, path, resources={'process': 1)

# Apply foo to the data n times
for _ in range(n):
    data = client.map(foo, x, resources={'process': 1)

# Save the processed data
data.map(save, x, resources={'process': 1)

# Retrieve results
client.gather(data)

我希望图像能够得到处理,因为 space 可以在工作人员上使用,但似乎所有图像都同时加载到不同的工作人员上。

编辑: 我的问题是所有任务都分配给了工人,他们没有足够的内存。我发现了如何限制工人在同一时刻处理的任务数量 [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see 此处)。 然而,在这个限制下,当我执行我的任务时,他们都完成了读取步骤,然后是处理步骤,最后是保存步骤。这是一个问题,因为图像溢出到磁盘。

有没有办法让每项任务都完成后再开始新任务? 例如在 Worker-1 上:读取(img1)->处理(img1)->保存(img1)->读取(img2)->...

Dask 通常不知道任务需要多少内存,它只能知道输出的大小,而且只有在它们完成后才能知道。这是因为 Dask 只是执行一个 pthon 函数,然后等待它完成;但是所有事情都可以在 python 函数中发生。通常,您应该期望开始的任务数与您所发现的可用工作核数一样多。

如果您想要较小的总内存负载,那么您的解决方案应该很简单:拥有足够少的工作人员,这样如果他们都在使用您可以预期的最大内存,您仍然有一些空闲在系统中应对。

编辑:您可能想在提交前尝试 运行 优化图表(尽管我认为这无论如何都应该发生),因为听起来您的线性任务链应该是 "fused". http://docs.dask.org/en/latest/optimize.html