当我 运行 芹菜任务时如何取消确认消息?

How to unacknowledge message when I run a celery task?

现在我有一些同步作业,它们是有状态的,所以如果任务失败,我必须取消确认消息,然后让它们转到 RabbitMQ 的前面。但是当我试图抛出错误时,我发现celery仍然确认了这个消息,并且队列已经被清除了。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    raise ValueError

而且我发现 celery 任务有一个方法叫做 retry,但是它会将任务添加到队列的后面。这不是我想要的。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15)

即使我不能用终止信号做到这一点:

os.kill(os.getpid(), signal.SIGKILL)

我该怎么办? celery 是否提供了一些错误,所以我可以引发此错误以通知 celery 不确认我的消息?

在文档中 https://docs.celeryproject.org/en/stable/userguide/configuration.html 我发现:

task_acks_on_failure_or_timeout 默认为 enabled .


所以我认为你应该尝试

的组合

task_acks_late=True + task_acks_on_failure_or_timeout=False
实现NO acknowledgement when a task fails.

根据文档,芹菜能够 RabbitMQs priority queues. https://docs.celeryproject.org/en/latest/faq.html#does-celery-support-task-priorities

因此,您应该能够通过将重试的任务设置为高于常规任务的优先级,将它们推到队列的前面。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15, priority=9)

根据这个github issue,您还可以将重试的任务分配给一个新的专用队列,并分配您的资源来优先处理该队列。

@celery.task(bind=True)
def my_task(self, *args, **kwargs):
    try:
        raise ValueError
    except Exception:
        self.retry(countdown=15, queue='prioritized_queue_name')