如何处理 celery 中的错误 Task.map
How to handle errors in celery's Task.map
假设我有两个 celery 任务:
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
if a % 5:
return a
raise RuntimeError(a)
因此,当您转到 运行 run_flakey_things
时,它会立即失败,因为序列中的第一项会引发异常。我想要的是 运行 序列中所有项目的任务 按顺序 就像地图所做的那样,但在异常时继续 运行 ,引发新的异常一旦所有这些都完成了。
理想的情况是我可以在应用 xmap
对象之前添加一个 on_failure,但是 xmap
似乎不是一个完整的任务对象。
我没用过芹菜。但是,您可以向子任务添加一个 __call__
方法吗?类似于:
class MyTask:
def __call__(self, *args, **kwargs):
try:
super(self, MyTask).__call__(*args, **kwargs)
except Exception, exc:
# store exc to be raised later in the main task
@celery.task(base=MyTask):
def run_flakey_and_synchronous_thing(a):
# ...
然后对于主要 run_flakey_things
任务,可能重写 apply_async()
以检索存储的异常,如 How to override __call__ in celery on main?.
您可以更改 return 值来指示和传播错误。像这样:
import traceback
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
d = {'value': None, 'error': None}
try:
if a % 5:
d['value'] = a
except:
d['error'] = traceback.format_exc()
return d
然后您可以做一些事情:
1) 更改您的 run_flakey_things 以将有错误和没有错误的事物分组,return 没有错误的事物并报告有错误的事物。
2) 在任何调用中处理该行为 run_flakey_things
假设我有两个 celery 任务:
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
if a % 5:
return a
raise RuntimeError(a)
因此,当您转到 运行 run_flakey_things
时,它会立即失败,因为序列中的第一项会引发异常。我想要的是 运行 序列中所有项目的任务 按顺序 就像地图所做的那样,但在异常时继续 运行 ,引发新的异常一旦所有这些都完成了。
理想的情况是我可以在应用 xmap
对象之前添加一个 on_failure,但是 xmap
似乎不是一个完整的任务对象。
我没用过芹菜。但是,您可以向子任务添加一个 __call__
方法吗?类似于:
class MyTask:
def __call__(self, *args, **kwargs):
try:
super(self, MyTask).__call__(*args, **kwargs)
except Exception, exc:
# store exc to be raised later in the main task
@celery.task(base=MyTask):
def run_flakey_and_synchronous_thing(a):
# ...
然后对于主要 run_flakey_things
任务,可能重写 apply_async()
以检索存储的异常,如 How to override __call__ in celery on main?.
您可以更改 return 值来指示和传播错误。像这样:
import traceback
@celery.task
def run_flakey_things(*args, **kwargs):
return run_flakey_and_synchronous_thing.map(
xrange(10)
).apply_async()
@celery.task
def run_flakey_and_synchronous_thing(a):
d = {'value': None, 'error': None}
try:
if a % 5:
d['value'] = a
except:
d['error'] = traceback.format_exc()
return d
然后您可以做一些事情:
1) 更改您的 run_flakey_things 以将有错误和没有错误的事物分组,return 没有错误的事物并报告有错误的事物。
2) 在任何调用中处理该行为 run_flakey_things