使用 RabbitMQ 更新 Celery 中的任务
Update tasks in Celery with RabbitMQ
我在我的 django 项目中使用 Celery 创建任务以在将来的特定时间发送电子邮件。用户可以使用 notify_on
日期时间字段创建通知实例。然后我将 notify_on
的值作为 eta
.
传递
class Notification(models.Model):
...
notify_on = models.DateTimeField()
def notification_post_save(instance, *args, **kwargs):
send_notification.apply_async((instance,), eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)
该方法的问题在于,如果 notify_on
将被用户更改,他将收到两个(或更多)通知而不是一个。
问题是我如何更新与特定通知关联的任务,或者以某种方式删除旧的并创建新的。
首先,使用post_save
,我们无法获取旧数据。所以,在这里我重写了 Notification
模型的 save()
方法。除此之外,创建一个字段来存储芹菜task_id。
<b>from celery.task.control import revoke</b>
class Notification(models.Model):
...
notify_on = models.DateTimeField()
<b>celery_task_id = models.CharField(max_length=100)
def save(self, *args, **kwargs):
pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
super().save(*args, **kwargs)
post_notify_on = self.notify_on
if not self.celery_task_id: # initial task creation
task_object = send_notification.apply_async((self,), eta=self.notify_on)
Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
elif pre_notify_on != post_notify_on:
# revoke the old task
revoke(self.celery_task_id, terminate=True)
task_object = send_notification.apply_async((self,), eta=self.notify_on)
Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)</b>
参考
- Cancel an already executing task with Celery?
- Django: How to access original (unmodified) instance in post_save signal
我觉得之前的任务没有必要删除。您只需验证正在执行的任务是最后一个任务。为此,创建一个名为 checksum 的新字段,它是一个 UUID 字段,每次更改 notify_on 时更新该字段。在发送电子邮件的任务中检查此校验和。
class Notification(models.Model):
checksum = models.UUIDField(default=uuid.uuid4)
notify_on = models.DateTimeField()
def notification_post_save(instance, *args, **kwargs):
send_notification.apply_async((instance.id, str(instance.checksum)),eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)
@shared_task
def send_notification(notification_id, checksum):
notification = Notification.objects.get(id=notification_id)
if str(notification.checksum) != checksum:
return False
#send email
另外,请不要每次都在通知对象上发送信号,只需在 notify_on 更改时发送即可。你也可以检查这个
我在我的 django 项目中使用 Celery 创建任务以在将来的特定时间发送电子邮件。用户可以使用 notify_on
日期时间字段创建通知实例。然后我将 notify_on
的值作为 eta
.
class Notification(models.Model):
...
notify_on = models.DateTimeField()
def notification_post_save(instance, *args, **kwargs):
send_notification.apply_async((instance,), eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)
该方法的问题在于,如果 notify_on
将被用户更改,他将收到两个(或更多)通知而不是一个。
问题是我如何更新与特定通知关联的任务,或者以某种方式删除旧的并创建新的。
首先,使用post_save
,我们无法获取旧数据。所以,在这里我重写了 Notification
模型的 save()
方法。除此之外,创建一个字段来存储芹菜task_id。
<b>from celery.task.control import revoke</b>
class Notification(models.Model):
...
notify_on = models.DateTimeField()
<b>celery_task_id = models.CharField(max_length=100)
def save(self, *args, **kwargs):
pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
super().save(*args, **kwargs)
post_notify_on = self.notify_on
if not self.celery_task_id: # initial task creation
task_object = send_notification.apply_async((self,), eta=self.notify_on)
Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
elif pre_notify_on != post_notify_on:
# revoke the old task
revoke(self.celery_task_id, terminate=True)
task_object = send_notification.apply_async((self,), eta=self.notify_on)
Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)</b>
参考
- Cancel an already executing task with Celery?
- Django: How to access original (unmodified) instance in post_save signal
我觉得之前的任务没有必要删除。您只需验证正在执行的任务是最后一个任务。为此,创建一个名为 checksum 的新字段,它是一个 UUID 字段,每次更改 notify_on 时更新该字段。在发送电子邮件的任务中检查此校验和。
class Notification(models.Model):
checksum = models.UUIDField(default=uuid.uuid4)
notify_on = models.DateTimeField()
def notification_post_save(instance, *args, **kwargs):
send_notification.apply_async((instance.id, str(instance.checksum)),eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)
@shared_task
def send_notification(notification_id, checksum):
notification = Notification.objects.get(id=notification_id)
if str(notification.checksum) != checksum:
return False
#send email
另外,请不要每次都在通知对象上发送信号,只需在 notify_on 更改时发送即可。你也可以检查这个