如何处理 Dask 中的优雅失败?
How to handle graceful failure in Dask?
我是 运行 一个小时的计算,它获取外部 API,处理它并保存到数据帧。 API 正在使用 Python 的请求库。
通过调整请求库,我设法避免了与重试和读取错误相关的问题,但当然并不是所有可能的问题都得到了处理。
每次 API 失败时,我的计算就会停止,我会损失一小时的工作。
我这样称呼 dask:
dd = daskDataFrame.from_pandas(result, npartitions=20)
future = dd.compute()
如果失败,是否有任何方法可以从点重新启动 Dask?
通过阅读文档,现在有 Client.retry() 函数:
https://distributed.dask.org/en/latest/api.html#distributed.Client.retry
我不知道如何在我的代码中使用它。
重试功能是解决方案吗?如果有,如何使用?
我也在 SO 中找到了这个相关问题:
但我不知道我是否需要在答案中实现建议的代码,或者只使用 retries 参数调用我的 compute() 函数。
通过 运行ning .compute
在 dask 数据帧上,您将其转换为内存中的 pandas 数据帧。如果你想要一个未来的对象,那么你可以 运行:
future = client.compute(dd)
# or
future = client.persist(dd)
如果上述其中一项失败,您将在未来的 status
属性 中看到 error
:
print(future.status)
# will print error if the dataframe could not be computed
不清楚什么是适合您的用例的最佳方法,但一种选择是有一个循环来检查状态,如果是错误,则 retries/restarts 进行计算,如下所示:
from dask.distributed import wait
while future.status!="finished":
wait(future)
if future.status == 'error':
future.retry()
我是 运行 一个小时的计算,它获取外部 API,处理它并保存到数据帧。 API 正在使用 Python 的请求库。
通过调整请求库,我设法避免了与重试和读取错误相关的问题,但当然并不是所有可能的问题都得到了处理。
每次 API 失败时,我的计算就会停止,我会损失一小时的工作。
我这样称呼 dask:
dd = daskDataFrame.from_pandas(result, npartitions=20)
future = dd.compute()
如果失败,是否有任何方法可以从点重新启动 Dask?
通过阅读文档,现在有 Client.retry() 函数: https://distributed.dask.org/en/latest/api.html#distributed.Client.retry
我不知道如何在我的代码中使用它。
重试功能是解决方案吗?如果有,如何使用?
我也在 SO 中找到了这个相关问题:
但我不知道我是否需要在答案中实现建议的代码,或者只使用 retries 参数调用我的 compute() 函数。
通过 运行ning .compute
在 dask 数据帧上,您将其转换为内存中的 pandas 数据帧。如果你想要一个未来的对象,那么你可以 运行:
future = client.compute(dd)
# or
future = client.persist(dd)
如果上述其中一项失败,您将在未来的 status
属性 中看到 error
:
print(future.status)
# will print error if the dataframe could not be computed
不清楚什么是适合您的用例的最佳方法,但一种选择是有一个循环来检查状态,如果是错误,则 retries/restarts 进行计算,如下所示:
from dask.distributed import wait
while future.status!="finished":
wait(future)
if future.status == 'error':
future.retry()