限制 dask 计算使用的 CPU 数量

limit number of CPUs used by dask compute

下面的代码使用 appx 1 秒在 8-CPU 系统上执行。如何手动配置 dask.compute 使用的 CPUs 的数量,例如 4 CPUs,这样即使在 8-CPU 系统上,下面的代码也将使用 appx 2 秒来执行?

import dask
from time import sleep

def f(x):
    sleep(1)
    return x**2

objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)

有几个选项:

  1. 指定集群创建时的工人数
from dask.distributed import Client

# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)

# the rest of your code is not changed
  1. 指定有多少(以及哪些)工人应该执行任务

client = Client(n_workers=8, threads_per_worker=1)

list_workers = list(client.scheduler_info()['workers'])

client.compute(objs, workers=list_workers[:4]) 

# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
  1. 指定信号量
from dask.distributed import Client, Semaphore

client = Client()
sem = Semaphore(max_leases=4, name="foo")

def fmodified(x, sem):
    with sem:
        return f(x)

objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs))  # (0, 1, 4, 9, 16, 25, 36, 49)

更新:正如@mdurant 在评论中指出的那样,如果您在脚本中 运行 this,则需要 if __name__ == "main": 来保护相关代码不被 worker 执行。例如,上面列表中的第二个选项在脚本中看起来像这样:

#!/usr/bin/env python3
import dask
from dask.distributed import Client
from time import sleep

def f(x):
    sleep(1)
    return x**2

objs = [dask.delayed(f)(x) for x in range(8)]

if __name__ == "main":
    client = Client(n_workers=8, threads_per_worker=1)

    list_workers = list(client.scheduler_info()['workers'])

    results = client.compute(objs, workers=list_workers[:4])

    print(results)