覆盖 dask scheduler 以同时在多个 worker 上加载数据
Override dask scheduler to concurrently load data on multiple workers
我想在我的分布式集群上 运行 graphs/futures,它们都有一个 'load data' 根任务,然后是一堆 运行 在该数据上的训练任务。简化版本如下所示:
from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params)
for params in train_param_set]
运行 如上所述,调度程序让一个工作人员读取文件,然后将该数据溢出到磁盘以与其他工作人员共享。然而,加载数据通常是从一个大的 HDF5 文件中读取,这可以同时完成,所以我想知道是否有办法强制所有工作人员同时读取这个文件(他们都计算根任务)而不是让他们等待一名工人完成,然后慢慢地从该工人那里传输数据。
我知道可以使用 client.run()
方法让所有工作人员同时读取文件,但是您如何才能将已读取的数据提供给下游任务?
我不能使用 dask 数据基元并发读取 HDF5 文件,因为我需要诸如多索引和多列分组之类的东西。
截至今天(distributed.__version__ == 1.20.2
),您所要求的是不可能的。最接近的是计算一次然后显式复制数据
future = client.submit(load, path)
wait(future)
client.replicate(future)
提出此功能请求
重新审视了这个问题并找到了一个相对简单的解决方案,尽管它使用内部 API 方法并涉及对 client.run()
的阻塞调用。使用与问题中相同的变量:
from distributed import get_worker
client_id = client.id
def load_dataset():
worker = get_worker()
data = {'load_dataset-0': load_data_func('path/to/data')}
info = worker.update_data(data=data, report=False)
worker.scheduler.update_data(who_has={key: [worker.address] for key in data},
nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)
现在,如果您 运行 client.has_what()
,您应该会看到每个工人都持有密钥 load_dataset-0
。要在下游计算中使用它,您只需为密钥创建一个未来:
from distributed import Future
load_data_future = Future('load_dataset-0', client=client)
这可以像往常一样与 client.compute()
或 dask.delayed
一起使用。事实上,问题示例的最后一行可以正常工作:
train_task_futures = [client.submit(train_func, load_data_future, params)
for params in train_param_set]
请记住,它使用内部 API 方法 Worker.update_data
和 Scheduler.update_data
,从 distributed.__version__ == 1.21.6
开始工作正常,但在未来的版本中可能会有所变化。
我想在我的分布式集群上 运行 graphs/futures,它们都有一个 'load data' 根任务,然后是一堆 运行 在该数据上的训练任务。简化版本如下所示:
from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params)
for params in train_param_set]
运行 如上所述,调度程序让一个工作人员读取文件,然后将该数据溢出到磁盘以与其他工作人员共享。然而,加载数据通常是从一个大的 HDF5 文件中读取,这可以同时完成,所以我想知道是否有办法强制所有工作人员同时读取这个文件(他们都计算根任务)而不是让他们等待一名工人完成,然后慢慢地从该工人那里传输数据。
我知道可以使用 client.run()
方法让所有工作人员同时读取文件,但是您如何才能将已读取的数据提供给下游任务?
我不能使用 dask 数据基元并发读取 HDF5 文件,因为我需要诸如多索引和多列分组之类的东西。
截至今天(distributed.__version__ == 1.20.2
),您所要求的是不可能的。最接近的是计算一次然后显式复制数据
future = client.submit(load, path)
wait(future)
client.replicate(future)
提出此功能请求
重新审视了这个问题并找到了一个相对简单的解决方案,尽管它使用内部 API 方法并涉及对 client.run()
的阻塞调用。使用与问题中相同的变量:
from distributed import get_worker
client_id = client.id
def load_dataset():
worker = get_worker()
data = {'load_dataset-0': load_data_func('path/to/data')}
info = worker.update_data(data=data, report=False)
worker.scheduler.update_data(who_has={key: [worker.address] for key in data},
nbytes=info['nbytes'], client=client_id)
client.run(load_dataset)
现在,如果您 运行 client.has_what()
,您应该会看到每个工人都持有密钥 load_dataset-0
。要在下游计算中使用它,您只需为密钥创建一个未来:
from distributed import Future
load_data_future = Future('load_dataset-0', client=client)
这可以像往常一样与 client.compute()
或 dask.delayed
一起使用。事实上,问题示例的最后一行可以正常工作:
train_task_futures = [client.submit(train_func, load_data_future, params)
for params in train_param_set]
请记住,它使用内部 API 方法 Worker.update_data
和 Scheduler.update_data
,从 distributed.__version__ == 1.21.6
开始工作正常,但在未来的版本中可能会有所变化。