使用 dask-distributed 如何从队列提供的长 运行 任务生成期货

With dask-distributed how to generate futures from long running tasks fed by queues

我正在按照本例 http://matthewrocklin.com/blog/work/2017/02/11/dask-tensorflow 的思路使用磁盘分布式长 运行 任务,其中长 运行 工作任务从队列中获取输入,如tensorflow 示例并将其结果传递到输出队列。 (我没有在最新版本的 dask 中看到示例中使用的通道)。

我可以看到如何分散列表并应用映射来生成将输入数据推送到工作人员输入队列的期货列表。

def transfer_dask_to_worker(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)

data = [1,2,3,4] 

future_data = e.scatter(data)

tasks = e.map(transfer_dask_to_worker, future_data ,
     workers=dask_spec['worker'], pure=False)

现在,如果我们等待工作人员处理任务,所有结果都将在工作人员的输出队列中。我们可以使用

将其全部拉回来
def transfer_worker_to_dask(arg):
    worker = get_worker()
    return worker.output_queue.get()

results = e.map(transfer_worker_to_dask,range(len(tasks)))

只要我们通过等待所有工作任务完成后再回调它们来手动处理排序,这就可以正常工作。

我们如何 link 输出期货是输入的下游?有没有办法让 运行 长任务在可以收集回调度程序任务的工人身上创建未来?

我试着让 transfer_dask_to_worker(batch) 也查询输出队列和 return 结果:

def transfer_dask_to_worker_and_return(batch):
    worker = get_worker()
    worker.tensorflow_queue.put(batch)
    return worker.output_queue.get()

这适用于短名单,但在取消期货约 1000 件时开始失败。

提前致谢。

注意:该博文是实验性的。这里有几种方法,我不会局限于那种模式

让我们从这个具体问题开始:

我们如何 link 输出期货是输入的下游?有没有办法让 运行 长任务在 worker 上创建可以收集回调度程序任务的 futures?

这里最简单的解决办法大概就是将本地数据打散,然后放到一个Dask distributed Queue中。因此,如果您有一个 TensorFlow 代码在产生某些结果时调用的函数,那么该函数可能会将本地数据分散到未来(这实际上并没有移动数据,它只是让 Dask worker 开始跟踪它)然后将其放入未来进入分布式队列。将未来放在队列中允许 Dask 中的其他客户端和工作人员知道数据的存在,并在必要时将其拉下

from dask.distributed import Queue
results_q = Queue()

def tf_result_ready(result):
    future = get_worker().scatter(result)
    results_q.put(future)

然后您可以坐在您的客户端代码中,并在结果可用时从该队列中提取结果:

for _ in range(n_blocks):
    future = results_q.get()
    # do stuff with future like submit or gather