覆盖 dask scheduler 以同时在多个 worker 上加载数据

Override dask scheduler to concurrently load data on multiple workers

我想在我的分布式集群上 运行 graphs/futures,它们都有一个 'load data' 根任务,然后是一堆 运行 在该数据上的训练任务。简化版本如下所示:

from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

运行 如上所述,调度程序让一个工作人员读取文件,然后将该数据溢出到磁盘以与其他工作人员共享。然而,加载数据通常是从一个大的 HDF5 文件中读取,这可以同时完成,所以我想知道是否有办法强制所有工作人员同时读取这个文件(他们都计算根任务)而不是让他们等待一名工人完成,然后慢慢地从该工人那里传输数据。

我知道可以使用 client.run() 方法让所有工作人员同时读取文件,但是您如何才能将已读取的数据提供给下游任务?

我不能使用 dask 数据基元并发读取 HDF5 文件,因为我需要诸如多索引和多列分组之类的东西。

截至今天(distributed.__version__ == 1.20.2),您所要求的是不可能的。最接近的是计算一次然后显式复制数据

future = client.submit(load, path)
wait(future)
client.replicate(future)

您可能希望在 https://github.com/dask/distributed/issues/new

提出此功能请求

重新审视了这个问题并找到了一个相对简单的解决方案,尽管它使用内部 API 方法并涉及对 client.run() 的阻塞调用。使用与问题中相同的变量:

from distributed import get_worker
client_id = client.id
def load_dataset():
    worker = get_worker()
    data = {'load_dataset-0': load_data_func('path/to/data')}
    info = worker.update_data(data=data, report=False)
    worker.scheduler.update_data(who_has={key: [worker.address] for key in data}, 
                                 nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)

现在,如果您 运行 client.has_what(),您应该会看到每个工人都持有密钥 load_dataset-0。要在下游计算中使用它,您只需为密钥创建一个未来:

from distributed import Future
load_data_future = Future('load_dataset-0', client=client)

这可以像往常一样与 client.compute()dask.delayed 一起使用。事实上,问题示例的最后一行可以正常工作:

train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]

请记住,它使用内部 API 方法 Worker.update_dataScheduler.update_data,从 distributed.__version__ == 1.21.6 开始工作正常,但在未来的版本中可能会有所变化。