当我 运行 芹菜任务时如何取消确认消息?
How to unacknowledge message when I run a celery task?
现在我有一些同步作业,它们是有状态的,所以如果任务失败,我必须取消确认消息,然后让它们转到 RabbitMQ
的前面。但是当我试图抛出错误时,我发现celery仍然确认了这个消息,并且队列已经被清除了。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
raise ValueError
而且我发现 celery 任务有一个方法叫做 retry
,但是它会将任务添加到队列的后面。这不是我想要的。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15)
即使我不能用终止信号做到这一点:
os.kill(os.getpid(), signal.SIGKILL)
我该怎么办? celery 是否提供了一些错误,所以我可以引发此错误以通知 celery 不确认我的消息?
在文档中 https://docs.celeryproject.org/en/stable/userguide/configuration.html 我发现:
task_acks_on_failure_or_timeout
默认为 enabled
.
所以我认为你应该尝试
的组合
task_acks_late=True
+ task_acks_on_failure_or_timeout=False
实现NO acknowledgement when a task fails
.
根据文档,芹菜能够 RabbitMQs priority queues. https://docs.celeryproject.org/en/latest/faq.html#does-celery-support-task-priorities
因此,您应该能够通过将重试的任务设置为高于常规任务的优先级,将它们推到队列的前面。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, priority=9)
或
根据这个github issue,您还可以将重试的任务分配给一个新的专用队列,并分配您的资源来优先处理该队列。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, queue='prioritized_queue_name')
现在我有一些同步作业,它们是有状态的,所以如果任务失败,我必须取消确认消息,然后让它们转到 RabbitMQ
的前面。但是当我试图抛出错误时,我发现celery仍然确认了这个消息,并且队列已经被清除了。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
raise ValueError
而且我发现 celery 任务有一个方法叫做 retry
,但是它会将任务添加到队列的后面。这不是我想要的。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15)
即使我不能用终止信号做到这一点:
os.kill(os.getpid(), signal.SIGKILL)
我该怎么办? celery 是否提供了一些错误,所以我可以引发此错误以通知 celery 不确认我的消息?
在文档中 https://docs.celeryproject.org/en/stable/userguide/configuration.html 我发现:
task_acks_on_failure_or_timeout
默认为 enabled
.
所以我认为你应该尝试
的组合task_acks_late=True
+ task_acks_on_failure_or_timeout=False
实现NO acknowledgement when a task fails
.
根据文档,芹菜能够 RabbitMQs priority queues. https://docs.celeryproject.org/en/latest/faq.html#does-celery-support-task-priorities
因此,您应该能够通过将重试的任务设置为高于常规任务的优先级,将它们推到队列的前面。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, priority=9)
或
根据这个github issue,您还可以将重试的任务分配给一个新的专用队列,并分配您的资源来优先处理该队列。
@celery.task(bind=True)
def my_task(self, *args, **kwargs):
try:
raise ValueError
except Exception:
self.retry(countdown=15, queue='prioritized_queue_name')