如何在 django 应用程序中用 azure 服务总线替换 celery 任务?
How to replace celery task with azure service bus in a django application?
我被要求在 Django 应用程序中使用 azure 服务总线而不是 celery。
阅读提供的文档但没有清楚地了解使用服务总线而不是 celery 任务。提供的任何建议都会有很大帮助。
在开始之前,我想强调一下 Azure Service Bus 和 Celery 之间的区别。
Azure 服务总线:
Microsoft Azure 服务总线是一个完全托管的企业集成消息代理。
您可以参考this了解更多关于服务总线的信息
芹菜:
分布式任务队列。 Celery 是一个基于分布式消息传递的异步任务 queue/job 队列。
对于你的情况,我可以想到 2 种可能性:
- 您想将 Service Bus 与 Celery 一起使用,而不是其他
消息代理。
- 用服务总线替换 Celery
1:您想将服务总线与 Celery 一起使用来代替其他消息代理。
您可以参考 understand why celery needs a message broker。
我不确定您目前使用的是哪个消息代理,但您可以使用 Kombu library 来满足您的要求。
Azure 服务总线参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html
其他人参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html
2:用Service Bus完全替换Celery
满足您的要求:
考虑
- 消息发送者是生产者
- 消息接收者是消费者
这是您必须处理的两个不同的应用程序。
您可以参考以下内容以获取更多示例代码。
https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples
解释:
- 每次你想执行的动作,你可以发送
从生产者客户端到主题的消息。
- 消费者客户端 - 正在侦听的应用程序将接收消息并进行处理。您可以将您的自定义流程附加到它 - 这样,只要在消费者客户端收到消息,您的自定义流程就会执行。
接收客户端示例如下:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
##### - Your code to execute when a message is received - ########
print(str(m))
##### - Your code to execute when a message is received - ########
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
下行之间的部分是每次收到消息时都会执行的指令。
##### - Your code to execute when a message is received - ########
我被要求在 Django 应用程序中使用 azure 服务总线而不是 celery。
阅读提供的文档但没有清楚地了解使用服务总线而不是 celery 任务。提供的任何建议都会有很大帮助。
在开始之前,我想强调一下 Azure Service Bus 和 Celery 之间的区别。
Azure 服务总线:
Microsoft Azure 服务总线是一个完全托管的企业集成消息代理。
您可以参考this了解更多关于服务总线的信息
芹菜:
分布式任务队列。 Celery 是一个基于分布式消息传递的异步任务 queue/job 队列。
对于你的情况,我可以想到 2 种可能性:
- 您想将 Service Bus 与 Celery 一起使用,而不是其他 消息代理。
- 用服务总线替换 Celery
1:您想将服务总线与 Celery 一起使用来代替其他消息代理。
您可以参考 understand why celery needs a message broker。 我不确定您目前使用的是哪个消息代理,但您可以使用 Kombu library 来满足您的要求。
Azure 服务总线参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html
其他人参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html
2:用Service Bus完全替换Celery 满足您的要求:
考虑
- 消息发送者是生产者
- 消息接收者是消费者
这是您必须处理的两个不同的应用程序。
您可以参考以下内容以获取更多示例代码。
https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples
解释:
- 每次你想执行的动作,你可以发送 从生产者客户端到主题的消息。
- 消费者客户端 - 正在侦听的应用程序将接收消息并进行处理。您可以将您的自定义流程附加到它 - 这样,只要在消费者客户端收到消息,您的自定义流程就会执行。
接收客户端示例如下:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
##### - Your code to execute when a message is received - ########
print(str(m))
##### - Your code to execute when a message is received - ########
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
下行之间的部分是每次收到消息时都会执行的指令。
##### - Your code to execute when a message is received - ########