芹菜关闭工人并重试任务

Celery shutdown worker and retry the task

我需要在 celery 任务中实现以下逻辑:如果满足某些条件,关闭当前 worker 并重试任务。

在示例任务上测试:

@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
    try:
        raise Exception('test exection')
    except Exception as exc:
        print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
        app.control.shutdown(destination=[self.request.hostname])  # send shutdown signal to the current worker
        raise self.retry(exc=exc, countdown=5)
    print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
    return 'some result'

但是结果很奇怪,步骤:

  1. 运行 工人:celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1.
  2. 将任务推送到 "test_queue" 队列。
  3. 工作人员发现并关闭。我在 RabbitMQ 中打开 'test_queue' 中的任务列表,看到:
    • 发布者提交的原始任务,重试次数 = 0(来自 app.control.shutdown() 调用);
    • 原始任务的副本(具有相同的 ID),重试次数 = 1(来自 self.retry() 调用)。
  4. 然后我启动了另一个工作人员到同一个队列,它也捕获了任务并关闭了。但是在 Broker 上,原始任务的另一个副本被推送到具有相同 ID 和重试次数 = 1 的队列中。因此,队列中有 3 个任务。接下来的所有 运行 工作人员都将 + 1 个新任务放入队列。条件 max_retries = 1 在这种情况下不起作用。

我尝试过的:

  1. 在celeryconfig.py和运行中设置task_reject_on_worker_lost = True相同的任务。结果:没有任何改变。
  2. 只在工作人员的任务中保留关闭调用。结果:每次尝试仅推回原始任务(没有任务重复),但不计算重试次数(始终设置为 0);
  3. 在关闭之前添加 app.control.revoke(self.request.id) 并在工作程序中重试调用(基于 )。结果:在第一次尝试后得到相同的结果(队列中有 2 个任务),但是当我 运行 第二个工作队列被刷新并且它没有 运行 任何东西时。因此,任务丢失且未重试。

有没有办法在 app.control.shutdown() 调用期间不将原始任务推回队列?看来这才是根本原因。或者您能否提出另一种解决方法,以实现上面指出的正确逻辑。

设置:RabbitMQ 3.8.2,芹菜 4.1.0,python3.5.4

celeryconfig.py中的设置:

task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True

问题似乎出在您的配置文件中 task_acks_late。通过使用它,您说的是 "Only remove the task from the queue when I have finished running"。然后你杀了工人,所以它永远不会被确认(你会得到重复的任务)。