在 Dask 中,如何根据全局(而非工作人员)资源约束来限制任务的调度?

In Dask, how can I limit scheduling of a task based on global (not worker) resource constraints?

我有一个使用 Dask 编写的大型数据提取作业,其中每个任务将从数十个数据库中的大量 table 中查询一个 table。对于每个数据库实例,我想限制一次连接的任务数量(即限制)。例如,我可能有 100 个连接到数据库 A 的任务,100 个连接到数据库 B,100 个连接到数据库 C,等等,我想确保在任何给定时间连接到任何数据库的任务不超过 20 个。

我看到 Dask 提供了基于 worker 资源(CPU、MEM、GPU 等)的约束,但是数据库资源是“全局的”,因此并不特定于任何 Dask worker。 Dask 是否提供任何对任务并发性约束进行建模的方法?

阅读文档几个小时后,我找到了自己问题的答案。 Dask 提供分布式信号量,可以限制对数据库等资源的并发访问。欲了解更多信息,请参阅:

https://docs.dask.org/en/latest/futures.html#id1

例子

import time
from dask.distributed import Client, Semaphore

client = Client(...)

def do_task(x, sem):
    with sem:
        time.sleep(5)
        return x

# allow no more than 5 tasks to run concurrently
sem = Semaphore(max_leases=5, name="Limiter")

# submit jobs that use the semaphore
futures = client.map(do_task, range(20), sem=sem)

# collect results
results = client.gather(futures)