如何处理 Dask 中的优雅失败?

How to handle graceful failure in Dask?

我是 运行 一个小时的计算,它获取外部 API,处理它并保存到数据帧。 API 正在使用 Python 的请求库。

通过调整请求库,我设法避免了与重试和读取错误相关的问题,但当然并不是所有可能的问题都得到了处理。

每次 API 失败时,我的计算就会停止,我会损失一小时的工作。

我这样称呼 dask:

dd = daskDataFrame.from_pandas(result, npartitions=20)
future = dd.compute()

如果失败,是否有任何方法可以从点重新启动 Dask?

通过阅读文档,现在有 Client.retry() 函数: https://distributed.dask.org/en/latest/api.html#distributed.Client.retry

我不知道如何在我的代码中使用它。

重试功能是解决方案吗?如果有,如何使用?

我也在 SO 中找到了这个相关问题:

但我不知道我是否需要在答案中实现建议的代码,或者只使用 retries 参数调用我的 compute() 函数。

通过 运行ning .compute 在 dask 数据帧上,您将其转换为内存中的 pandas 数据帧。如果你想要一个未来的对象,那么你可以 运行:

future = client.compute(dd)
# or
future = client.persist(dd)

如果上述其中一项失败,您将在未来的 status 属性 中看到 error

print(future.status)
# will print error if the dataframe could not be computed

不清楚什么是适合您的用例的最佳方法,但一种选择是有一个循环来检查状态,如果是错误,则 retries/restarts 进行计算,如下所示:

from dask.distributed import wait
while future.status!="finished":
    wait(future)
    if future.status == 'error':
        future.retry()