如何处理 celery 中的错误 Task.map

How to handle errors in celery's Task.map

假设我有两个 celery 任务:

@celery.task
def run_flakey_things(*args, **kwargs):
    return run_flakey_and_synchronous_thing.map(
        xrange(10)
    ).apply_async()


@celery.task
def run_flakey_and_synchronous_thing(a):
    if a % 5:
        return a
    raise RuntimeError(a)

因此,当您转到 运行 run_flakey_things 时,它会立即失败,因为序列中的第一项会引发异常。我想要的是 运行 序列中所有项目的任务 按顺序 就像地图所做的那样,但在异常时继续 运行 ,引发新的异常一旦所有这些都完成了。

理想的情况是我可以在应用 xmap 对象之前添加一个 on_failure,但是 xmap 似乎不是一个完整的任务对象。

我没用过芹菜。但是,您可以向子任务添加一个 __call__ 方法吗?类似于:

class MyTask:
    def __call__(self, *args, **kwargs): 
        try: 
            super(self, MyTask).__call__(*args, **kwargs) 
        except Exception, exc:
            # store exc to be raised later in the main task

@celery.task(base=MyTask):
def run_flakey_and_synchronous_thing(a):
    # ...

然后对于主要 run_flakey_things 任务,可能重写 apply_async() 以检索存储的异常,如 How to override __call__ in celery on main?.

您可以更改 return 值来指示和传播错误。像这样:

import traceback

@celery.task
def run_flakey_things(*args, **kwargs):
    return run_flakey_and_synchronous_thing.map(
        xrange(10)
    ).apply_async()


@celery.task
def run_flakey_and_synchronous_thing(a):
    d = {'value': None, 'error': None}
    try:
        if a % 5:
            d['value'] = a
    except:
        d['error'] = traceback.format_exc()
    return d

然后您可以做一些事情:

1) 更改您的 run_flakey_things 以将有错误和没有错误的事物分组,return 没有错误的事物并报告有错误的事物。

2) 在任何调用中处理该行为 run_flakey_things