芹菜重试的消息保留在同一个工人中
Celery retried messages remain in same worker
我的印象是,如果 celery worker 得到一个任务,并且它被重试 - 它保留在 worker 的记忆中(与 eta) - 而不是 return到队列。
导致如果重试 celery 任务并且工作人员正忙于处理不同的任务,并且该任务 eta 到达 - 它必须等到它完成处理其他任务。
我尝试在文档中查找与我记忆中的内容一致的内容,但我找不到任何内容。
我尝试检查它的方法是创建两个任务。
@app.task(bind=True, name='task_that_holds_worker', rate_limit='4/m',
default_retry_delay=5 * 60,
max_retries=int(60 * 60 * 24 * 1 / (60 * 5)))
def task_that_holds_worker(self, *args, **kwargs):
import time
time.sleep(50000)
@app.task(bind=True, name='retried_task', rate_limit='2/m',
default_retry_delay=10 * 60,
max_retries=int(60 * 60 * 24 * 1 / (60 * 10)))
def retried_task(self, *args, **kwargs):
self.retry()
最简单的任务,只是检查一个任务是否正忙于其他任务 - 重试的任务没有被另一个工作人员处理。
然后我启动了一个 worker - 并通过以下方式触发了这两个任务:
from some_app import tasks
from some_app.celery_app import app
current_app = app.tasks
async_result = tasks.retried_task.delay()
import time
time.sleep(20)
async_result = tasks.task_that_holds_worker.delay()
工作人员处理了重试任务,并重试了它,
然后转移到睡觉的任务。
然后我启动了另一个工人,我可以看到它没有得到 'retried' 任务,只有第一个工人。
每个启动的 worker 都是用 --prefetch-multiplier=1 --concurrency=1
启动的
我复制这个的方式有问题吗?
或者这是芹菜重试任务的行为方式?
提前致谢!
- 芹菜:4.1.2
- Python: 3.6.2
- Rabbitmq 图片:rabbitmq:3.6.9-管理
您的复制方式有误。除非你有一个特殊的经纪人,芹菜将 总是 重新排队任务重试请求返回给经纪人。工作人员不会保留他们尝试执行哪个任务的任何记忆,并且没有添加到重试请求的数据允许 celery 将任务请求路由回同一个工作人员。无法保证或保证同一个工人会重试它之前见过的任务。您可以在 celery/app.task.py
中的芹菜代码中确认这一点
<i># get the signature of the task <b>as called</b></i>
S = self.signature_from_request(
request, args, kwargs,
countdown=countdown, eta=eta, retries=retries,
**options
)
<br>if max_retries is not None and retries > max_retries:
if exc:
# On Py3: will augment any current exception with
# the exc' argument provided (raise exc from orig)
raise_with_context(exc)
raise self.MaxRetriesExceededError(
"Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
self.name, request.id, S.args, S.kwargs))
<br>ret = Retry(exc=exc, when=eta or countdown)
if is_eager:
# if task was executed eagerly using apply(),
# then the retry must also be executed eagerly.
S.apply().get()
if throw:
raise ret
return ret
<br><b>try:
S.apply_async()
except Exception as exc:
raise Reject(exc, requeue=False)
if throw:
raise ret
return ret</b></p>
<p></pre>
我将您可以看到重试如何工作的部分加粗了。 Celery 获取任务请求签名(这包括任务名称和任务参数,并设置 eta、倒计时和重试)。然后 celery 将简单地调用 apply_async
,它在幕后只会将一个新的任务请求排队给代理。
您的示例没有工作,因为 celery worker 通常会从代理中拉出多个任务请求,因此可能发生的情况是第一个 worker 在第二个 worker 上线之前从 broker 抓取了任务。
这似乎是 eta 任务的问题。第一个可用的工作人员倒计时直到任务 eta 并且不会将其释放回队列。 (预取计数增加并被忽略)
我的印象是,如果 celery worker 得到一个任务,并且它被重试 - 它保留在 worker 的记忆中(与 eta) - 而不是 return到队列。 导致如果重试 celery 任务并且工作人员正忙于处理不同的任务,并且该任务 eta 到达 - 它必须等到它完成处理其他任务。
我尝试在文档中查找与我记忆中的内容一致的内容,但我找不到任何内容。
我尝试检查它的方法是创建两个任务。
@app.task(bind=True, name='task_that_holds_worker', rate_limit='4/m',
default_retry_delay=5 * 60,
max_retries=int(60 * 60 * 24 * 1 / (60 * 5)))
def task_that_holds_worker(self, *args, **kwargs):
import time
time.sleep(50000)
@app.task(bind=True, name='retried_task', rate_limit='2/m',
default_retry_delay=10 * 60,
max_retries=int(60 * 60 * 24 * 1 / (60 * 10)))
def retried_task(self, *args, **kwargs):
self.retry()
最简单的任务,只是检查一个任务是否正忙于其他任务 - 重试的任务没有被另一个工作人员处理。
然后我启动了一个 worker - 并通过以下方式触发了这两个任务:
from some_app import tasks
from some_app.celery_app import app
current_app = app.tasks
async_result = tasks.retried_task.delay()
import time
time.sleep(20)
async_result = tasks.task_that_holds_worker.delay()
工作人员处理了重试任务,并重试了它, 然后转移到睡觉的任务。 然后我启动了另一个工人,我可以看到它没有得到 'retried' 任务,只有第一个工人。
每个启动的 worker 都是用 --prefetch-multiplier=1 --concurrency=1
启动的
我复制这个的方式有问题吗?
或者这是芹菜重试任务的行为方式?
提前致谢!
- 芹菜:4.1.2
- Python: 3.6.2
- Rabbitmq 图片:rabbitmq:3.6.9-管理
您的复制方式有误。除非你有一个特殊的经纪人,芹菜将 总是 重新排队任务重试请求返回给经纪人。工作人员不会保留他们尝试执行哪个任务的任何记忆,并且没有添加到重试请求的数据允许 celery 将任务请求路由回同一个工作人员。无法保证或保证同一个工人会重试它之前见过的任务。您可以在 celery/app.task.py
<i># get the signature of the task <b>as called</b></i> S = self.signature_from_request( request, args, kwargs, countdown=countdown, eta=eta, retries=retries, **options ) <br>if max_retries is not None and retries > max_retries: if exc: # On Py3: will augment any current exception with # the exc' argument provided (raise exc from orig) raise_with_context(exc) raise self.MaxRetriesExceededError( "Can't retry {0}[{1}] args:{2} kwargs:{3}".format( self.name, request.id, S.args, S.kwargs)) <br>ret = Retry(exc=exc, when=eta or countdown) if is_eager: # if task was executed eagerly using apply(), # then the retry must also be executed eagerly. S.apply().get() if throw: raise ret return ret <br><b>try: S.apply_async() except Exception as exc: raise Reject(exc, requeue=False) if throw: raise ret return ret</b></p> <p></pre>
我将您可以看到重试如何工作的部分加粗了。 Celery 获取任务请求签名(这包括任务名称和任务参数,并设置 eta、倒计时和重试)。然后 celery 将简单地调用
apply_async
,它在幕后只会将一个新的任务请求排队给代理。您的示例没有工作,因为 celery worker 通常会从代理中拉出多个任务请求,因此可能发生的情况是第一个 worker 在第二个 worker 上线之前从 broker 抓取了任务。
这似乎是 eta 任务的问题。第一个可用的工作人员倒计时直到任务 eta 并且不会将其释放回队列。 (预取计数增加并被忽略)