Celery、RabbitMQ 消息不断发送
Celery, RabbitMQ messages keep getting sent
这是设置 - django 项目,使用 celery 和 CloudAMQP rabbitMQ worker 进行消息代理。
我的 Celery/RabbitMQ 设置:
# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
一个docker容器运行芹菜,使用以下命令:
bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'
shared_task定义:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def push_notification(user_id, message):
logging.critical('Push notifications sent')
return {'status': 'success'}
我实际上是在发生某些事情时调用它(我省略了一些代码,因为它似乎不相关):
from notificatons.tasks import push_notification
def like_this(self, **args):
# Do like stuff and then do .delay()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
所以当这是 运行 - 一切看起来都很好 - 输出看起来像这样:
worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}
据我所知,任务已经 运行 并正确执行,消息应该停止,RabbitMQ 应该停止。
但在我的 RabbitMQ 管理中,我看到消息不断发布和传递:
所以我从中收集到的信息是 RabbitMQ 正在尝试发送某种确认并失败并重试?有没有办法真正关闭这种行为?
热烈欢迎所有帮助和建议。
编辑:忘记提及一些重要的事情 - 在我调用 push_notification.delay() 之前,消息选项卡是空的,除了每 30 秒来来去去的心跳。只有在我调用 .delay() 之后才会发生这种情况。
编辑 2:CELERYBEAT_SCHEDULE 设置(我试过 运行 并且没有它们 - 没有区别但添加它们以防万一)
CELERYBEAT_SCHEDULE = {
"minutely_process_all_notifications": {
'task': 'transmissions.tasks.process_all_notifications',
'schedule': crontab(minute='*')
}
}
编辑 3:添加了查看代码。我也没有使用 CELERYBEAT_SCHEDULE。我只是将配置保留在代码中以供将来的计划任务使用
from notifications.tasks import push_notification
class MediaLikesView(BaseView):
def post(self, request, media_id):
media = self.get_object(media_id)
data = {}
data['media'] = media.id
data['user'] = request.user.id
serializer = MediaLikeSerializer(data=data)
if serializer.is_valid():
like = serializer.save()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
serializer = MediaGetLikeSerializer(like)
return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
是芹菜的八卦。通过将 --without-gossip --without-mingle --without-heartbeat
添加到命令行参数来禁用。
此外,当您在命令行上禁用心跳时,不要忘记设置 BROKER_HEARTBEAT = None
,否则您将在 30 秒后断开连接。依赖 TCP keepalive 通常比 AMQP 心跳更好,或者更糟的是,Celery 自己的心跳。
这是设置 - django 项目,使用 celery 和 CloudAMQP rabbitMQ worker 进行消息代理。
我的 Celery/RabbitMQ 设置:
# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
一个docker容器运行芹菜,使用以下命令:
bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'
shared_task定义:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def push_notification(user_id, message):
logging.critical('Push notifications sent')
return {'status': 'success'}
我实际上是在发生某些事情时调用它(我省略了一些代码,因为它似乎不相关):
from notificatons.tasks import push_notification
def like_this(self, **args):
# Do like stuff and then do .delay()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
所以当这是 运行 - 一切看起来都很好 - 输出看起来像这样:
worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}
据我所知,任务已经 运行 并正确执行,消息应该停止,RabbitMQ 应该停止。
但在我的 RabbitMQ 管理中,我看到消息不断发布和传递:
所以我从中收集到的信息是 RabbitMQ 正在尝试发送某种确认并失败并重试?有没有办法真正关闭这种行为?
热烈欢迎所有帮助和建议。
编辑:忘记提及一些重要的事情 - 在我调用 push_notification.delay() 之前,消息选项卡是空的,除了每 30 秒来来去去的心跳。只有在我调用 .delay() 之后才会发生这种情况。
编辑 2:CELERYBEAT_SCHEDULE 设置(我试过 运行 并且没有它们 - 没有区别但添加它们以防万一)
CELERYBEAT_SCHEDULE = {
"minutely_process_all_notifications": {
'task': 'transmissions.tasks.process_all_notifications',
'schedule': crontab(minute='*')
}
}
编辑 3:添加了查看代码。我也没有使用 CELERYBEAT_SCHEDULE。我只是将配置保留在代码中以供将来的计划任务使用
from notifications.tasks import push_notification
class MediaLikesView(BaseView):
def post(self, request, media_id):
media = self.get_object(media_id)
data = {}
data['media'] = media.id
data['user'] = request.user.id
serializer = MediaLikeSerializer(data=data)
if serializer.is_valid():
like = serializer.save()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
serializer = MediaGetLikeSerializer(like)
return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
是芹菜的八卦。通过将 --without-gossip --without-mingle --without-heartbeat
添加到命令行参数来禁用。
此外,当您在命令行上禁用心跳时,不要忘记设置 BROKER_HEARTBEAT = None
,否则您将在 30 秒后断开连接。依赖 TCP keepalive 通常比 AMQP 心跳更好,或者更糟的是,Celery 自己的心跳。