Dask 不尊重工人 Directive/Constraint (workers='...', allow_other_workers=False)

Dask Not Respecting Worker Directive/Constraint (workers='...', allow_other_workers=False)

我注意到在提交作业和 运行 compute() 时,尽管试图将工作限制在 dask 分布式集群上的特定节点(使用 workers='...', allow_other_workers=False),任务似乎仍然由多个工人完成。

例如,让我们读入一个 CSV 文件,然后尝试汇总一列的内容:

import dask.dataframe as dd
dfut1 = client.submit(dd.read_csv, 
                     'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2009-01.csv', 
                     workers='w1', allow_other_workers=False)
df1 = client.gather(dfut1)
df1.Passenger_Count.sum().compute(workers='w1', allow_other_workers=None)

当运行最终喜欢(...sum...compute)时,只需查看 Dask Dashboard 上的 "Status" 选项卡,很明显两者都在进行计算集群上的工作人员,而不仅仅是 w1 按照指示。 (这由 client.has_what() 支持,它从字面上显示任务分布在集群中。)

我也试过 allow_other_workers=False 并简单地省略了参数,但所有组合都会导致跨集群完成工作。我还尝试用 ip:port 替换工人名称,并使用列表而不是字符串,但没有成功。 有没有办法真正强制 machine/node/worker 数据和任务执行的任务的亲和力?

作为背景,

我在我的第一台机器上启动了一个 dask scheduler

dask-scheduler

我在第二台机器上启动了一个 dask worker

dask-worker <schedulerip:port> --name w1

我在第三台机器上启动了另一个 dask worker

dask-worker <schedulerip:port> --name w2

这是在 python 3.6.

上使用 dask==1.2.2 和 distributed==1.28.0

您似乎在提交调用中调用了 dask 数据框函数。这很奇怪。您正在向特定工作人员的 运行 提交 Dask 代码。该 Dask 代码然后回调到集群 运行 事情。没有理由在 dask 函数上调用提交。

http://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections