限制 dask 计算使用的 CPU 数量
limit number of CPUs used by dask compute
下面的代码使用 appx 1 秒在 8-CPU 系统上执行。如何手动配置 dask.compute
使用的 CPUs 的数量,例如 4 CPUs,这样即使在 8-CPU 系统上,下面的代码也将使用 appx 2 秒来执行?
import dask
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
有几个选项:
- 指定集群创建时的工人数
from dask.distributed import Client
# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)
# the rest of your code is not changed
- 指定有多少(以及哪些)工人应该执行任务
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
client.compute(objs, workers=list_workers[:4])
# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
- 指定信号量
from dask.distributed import Client, Semaphore
client = Client()
sem = Semaphore(max_leases=4, name="foo")
def fmodified(x, sem):
with sem:
return f(x)
objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
更新:正如@mdurant 在评论中指出的那样,如果您在脚本中 运行 this,则需要 if __name__ == "main":
来保护相关代码不被 worker 执行。例如,上面列表中的第二个选项在脚本中看起来像这样:
#!/usr/bin/env python3
import dask
from dask.distributed import Client
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
if __name__ == "main":
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
results = client.compute(objs, workers=list_workers[:4])
print(results)
下面的代码使用 appx 1 秒在 8-CPU 系统上执行。如何手动配置 dask.compute
使用的 CPUs 的数量,例如 4 CPUs,这样即使在 8-CPU 系统上,下面的代码也将使用 appx 2 秒来执行?
import dask
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
有几个选项:
- 指定集群创建时的工人数
from dask.distributed import Client
# without specifying unique thread, the function is executed
# on all threads
client = Client(n_workers=4, threads_per_worker=1)
# the rest of your code is not changed
- 指定有多少(以及哪些)工人应该执行任务
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
client.compute(objs, workers=list_workers[:4])
# submit only to the first 4 workers
# note that workers should still be single-threaded, but the difference
# from option 1 is that you could in principle have more workers
# that are idle, also the `workers` kwarg can be passed to
# dask.compute rather than client.compute
- 指定信号量
from dask.distributed import Client, Semaphore
client = Client()
sem = Semaphore(max_leases=4, name="foo")
def fmodified(x, sem):
with sem:
return f(x)
objs = [dask.delayed(fmodified)(x, sem) for x in range(8)]
print(dask.compute(*objs)) # (0, 1, 4, 9, 16, 25, 36, 49)
更新:正如@mdurant 在评论中指出的那样,如果您在脚本中 运行 this,则需要 if __name__ == "main":
来保护相关代码不被 worker 执行。例如,上面列表中的第二个选项在脚本中看起来像这样:
#!/usr/bin/env python3
import dask
from dask.distributed import Client
from time import sleep
def f(x):
sleep(1)
return x**2
objs = [dask.delayed(f)(x) for x in range(8)]
if __name__ == "main":
client = Client(n_workers=8, threads_per_worker=1)
list_workers = list(client.scheduler_info()['workers'])
results = client.compute(objs, workers=list_workers[:4])
print(results)