芹菜重试的消息保留在同一个工人中

Celery retried messages remain in same worker

我的印象是,如果 celery worker 得到一个任务,并且它被重试 - 它保留在 worker 的记忆中(与 eta) - 而不是 return到队列。 导致如果重试 celery 任务并且工作人员正忙于处理不同的任务,并且该任务 eta 到达 - 它必须等到它完成处理其他任务。

我尝试在文档中查找与我记忆中的内容一致的内容,但我找不到任何内容。

我尝试检查它的方法是创建两个任务。

@app.task(bind=True, name='task_that_holds_worker', rate_limit='4/m',
          default_retry_delay=5 * 60,
          max_retries=int(60 * 60 * 24 * 1 / (60 * 5)))
def task_that_holds_worker(self, *args, **kwargs):
    import time
    time.sleep(50000)

@app.task(bind=True, name='retried_task', rate_limit='2/m',
          default_retry_delay=10 * 60,
          max_retries=int(60 * 60 * 24 * 1 / (60 * 10)))
def retried_task(self, *args, **kwargs):
    self.retry()

最简单的任务,只是检查一个任务是否正忙于其他任务 - 重试的任务没有被另一个工作人员处理。

然后我启动了一个 worker - 并通过以下方式触发了这两个任务:

from some_app import tasks
from some_app.celery_app import app

current_app = app.tasks

async_result = tasks.retried_task.delay()

import time
time.sleep(20)

async_result = tasks.task_that_holds_worker.delay()

工作人员处理了重试任务,并重试了它, 然后转移到睡觉的任务。 然后我启动了另一个工人,我可以看到它没有得到 'retried' 任务,只有第一个工人。

每个启动的 worker 都是用 --prefetch-multiplier=1 --concurrency=1 启动的 我复制这个的方式有问题吗? 或者这是芹菜重试任务的行为方式?

提前致谢!

您的复制方式有误。除非你有一个特殊的经纪人,芹菜将 总是 重新排队任务重试请求返回给经纪人。工作人员不会保留他们尝试执行哪个任务的任何记忆,并且没有添加到重试请求的数据允许 celery 将任务请求路由回同一个工作人员。无法保证或保证同一个工人会重试它之前见过的任务。您可以在 celery/app.task.py

中的芹菜代码中确认这一点


<i># get the signature of the task <b>as called</b></i>
S = self.signature_from_request(
    request, args, kwargs,
    countdown=countdown, eta=eta, retries=retries,
    **options
)
<br>if max_retries is not None and retries > max_retries:
    if exc:
        # On Py3: will augment any current exception with
        # the exc' argument provided (raise exc from orig)
        raise_with_context(exc)
    raise self.MaxRetriesExceededError(
        "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
            self.name, request.id, S.args, S.kwargs))
<br>ret = Retry(exc=exc, when=eta or countdown)
if is_eager:
    # if task was executed eagerly using apply(),
    # then the retry must also be executed eagerly.
    S.apply().get()
    if throw:
        raise ret
    return ret
<br><b>try:
    S.apply_async()
except Exception as exc:
    raise Reject(exc, requeue=False)
if throw:
    raise ret
return ret</b></p>

<p></pre>

我将您可以看到重试如何工作的部分加粗了。 Celery 获取任务请求签名(这包括任务名称和任务参数,并设置 eta、倒计时和重试)。然后 celery 将简单地调用 apply_async,它在幕后只会将一个新的任务请求排队给代理。

您的示例没有工作,因为 celery worker 通常会从代理中拉出多个任务请求,因此可能发生的情况是第一个 worker 在第二个 worker 上线之前从 broker 抓取了任务。

这似乎是 eta 任务的问题。第一个可用的工作人员倒计时直到任务 eta 并且不会将其释放回队列。 (预取计数增加并被忽略)

https://github.com/celery/celery/issues/2541