运行 使用主题交换的多个 Celery 任务
Run multiple Celery tasks using a topic exchange
我正在用 Celery 替换一些自己开发的代码,但很难复制当前的行为。我想要的行为如下:
- 创建新用户时,应使用
user.created
路由键将消息发布到 tasks
交换。
- 此消息应触发两个 Celery 任务,即
send_user_activate_email
和 check_spam
。
我尝试通过定义带有 ignore_result=True
参数的 user_created
任务以及 send_user_activate_email
和 check_spam
的任务来实现此目的。
在我的配置中,我添加了以下路由和队列定义。当消息传递到 user_created
队列时,它不会传递到其他两个队列。
理想情况下,消息仅传递到 send_user_activate_email
和 check_spam
队列。当使用 vanilla RabbitMQ 时,消息被发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列。
我将如何在 Celery 中实现上述行为?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
听起来您希望 trigger/be 一条消息被两个队列使用,但这不是 Celery 的工作方式。一个 Exchange 将 post 一个任务发送给符合条件的队列,但是一旦它被消费,其他队列就会忽略该消息。每个要触发的任务都需要一条消息。
新的 Celery 用户经常会感到困惑,因为 "Queue" 在这个系统中有两种用途; Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列。当我们发布到队列时,我们想到的是 AMQP,这是不正确的。 (感谢下面链接的答案)。
回到你的问题,如果我理解正确的话,当 user_created 被消耗时,你希望它再产生两个任务; send_user_activate_email 和 check_spam。此外,它们不应相互依赖;他们可以 运行 在不同的机器上并行进行,并且不需要知道彼此的状态。
在这种情况下,您希望user_created到"apply_async"这两个新任务和return。这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery "Group" 来实现。该小组提供了一些很好的 shorthand 并为您的任务提供了一些结构,所以我个人会推动您朝这个方向发展。
#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
此设置将创建四个消息;一个用于您要执行的每个任务,一个用于 Group(),它本身会有一个结果。
对于您的情况,我不确定 Exchange 或 ignore_result 是否必要,但我需要查看任务代码并进一步了解系统才能做出判断。
http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups
http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys
Why do CELERY_ROUTES have both a "queue" and a "routing_key"?
(如果我跑题了,我会delete/remove回答...)
设计和解决问题的简单方法是使用 Celery 工作流。
但首先我会改变你的队列定义,为每个任务设置一个唯一的路由键和 exchange_type 和 'direct' 值。
根据 celery documentation, 直接交换通过精确路由键匹配 ,所以我们为所有自定义任务和消费者队列设置相同的交换,我们映射 routing_key(对于任务) 和 binding_key(对于队列)就像下一个片段:
CELERY_QUEUES = {
'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'},
'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'},
'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user_created',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'send_user_activate_email': {
'queue': 'send_user_activate_email',
'routing_key': 'send_user_activate_email',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'check_spam': {
'queue': 'check_spam',
'routing_key': 'check_spam',
'exchange': 'tasks',
'exchange_type': 'direct',
},
}
完成此更改后,您需要对可用列表使用正确的工作流程 (http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives)。阅读您的问题我认为您需要一个链,因为需要保留顺序。
sequential_tasks = []
sequential_tasks.append(user_created.s(**user_created_kwargs))
sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs))
sequential_tasks.append(check_spam.s(**check_spam_kwargs))
#you can add more tasks to the chain
chain(*sequential_tasks)()
Celery 将透明地处理与队列相关的工作。
我正在用 Celery 替换一些自己开发的代码,但很难复制当前的行为。我想要的行为如下:
- 创建新用户时,应使用
user.created
路由键将消息发布到tasks
交换。 - 此消息应触发两个 Celery 任务,即
send_user_activate_email
和check_spam
。
我尝试通过定义带有 ignore_result=True
参数的 user_created
任务以及 send_user_activate_email
和 check_spam
的任务来实现此目的。
在我的配置中,我添加了以下路由和队列定义。当消息传递到 user_created
队列时,它不会传递到其他两个队列。
理想情况下,消息仅传递到 send_user_activate_email
和 check_spam
队列。当使用 vanilla RabbitMQ 时,消息被发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列。
我将如何在 Celery 中实现上述行为?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
听起来您希望 trigger/be 一条消息被两个队列使用,但这不是 Celery 的工作方式。一个 Exchange 将 post 一个任务发送给符合条件的队列,但是一旦它被消费,其他队列就会忽略该消息。每个要触发的任务都需要一条消息。
新的 Celery 用户经常会感到困惑,因为 "Queue" 在这个系统中有两种用途; Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列。当我们发布到队列时,我们想到的是 AMQP,这是不正确的。 (感谢下面链接的答案)。
回到你的问题,如果我理解正确的话,当 user_created 被消耗时,你希望它再产生两个任务; send_user_activate_email 和 check_spam。此外,它们不应相互依赖;他们可以 运行 在不同的机器上并行进行,并且不需要知道彼此的状态。
在这种情况下,您希望user_created到"apply_async"这两个新任务和return。这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery "Group" 来实现。该小组提供了一些很好的 shorthand 并为您的任务提供了一些结构,所以我个人会推动您朝这个方向发展。
#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
此设置将创建四个消息;一个用于您要执行的每个任务,一个用于 Group(),它本身会有一个结果。
对于您的情况,我不确定 Exchange 或 ignore_result 是否必要,但我需要查看任务代码并进一步了解系统才能做出判断。
http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys Why do CELERY_ROUTES have both a "queue" and a "routing_key"?
(如果我跑题了,我会delete/remove回答...)
设计和解决问题的简单方法是使用 Celery 工作流。
但首先我会改变你的队列定义,为每个任务设置一个唯一的路由键和 exchange_type 和 'direct' 值。
根据 celery documentation, 直接交换通过精确路由键匹配 ,所以我们为所有自定义任务和消费者队列设置相同的交换,我们映射 routing_key(对于任务) 和 binding_key(对于队列)就像下一个片段:
CELERY_QUEUES = {
'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'},
'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'},
'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user_created',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'send_user_activate_email': {
'queue': 'send_user_activate_email',
'routing_key': 'send_user_activate_email',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'check_spam': {
'queue': 'check_spam',
'routing_key': 'check_spam',
'exchange': 'tasks',
'exchange_type': 'direct',
},
}
完成此更改后,您需要对可用列表使用正确的工作流程 (http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives)。阅读您的问题我认为您需要一个链,因为需要保留顺序。
sequential_tasks = []
sequential_tasks.append(user_created.s(**user_created_kwargs))
sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs))
sequential_tasks.append(check_spam.s(**check_spam_kwargs))
#you can add more tasks to the chain
chain(*sequential_tasks)()
Celery 将透明地处理与队列相关的工作。