如何在 django 应用程序中用 azure 服务总线替换 celery 任务?

How to replace celery task with azure service bus in a django application?

我被要求在 Django 应用程序中使用 azure 服务总线而不是 celery。

阅读提供的文档但没有清楚地了解使用服务总线而不是 celery 任务。提供的任何建议都会有很大帮助。

在开始之前,我想强调一下 Azure Service Bus 和 Celery 之间的区别。

Azure 服务总线:

Microsoft Azure 服务总线是一个完全托管的企业集成消息代理。

您可以参考this了解更多关于服务总线的信息

芹菜:

分布式任务队列。 Celery 是一个基于分布式消息传递的异步任务 queue/job 队列。

对于你的情况,我可以想到 2 种可能性:

  1. 您想将 Service Bus 与 Celery 一起使用,而不是其他 消息代理。
  2. 用服务总线替换 Celery

1:您想将服务总线与 Celery 一起使用来代替其他消息代理。

您可以参考 understand why celery needs a message broker。 我不确定您目前使用的是哪个消息代理,但您可以使用 Kombu library 来满足您的要求。

Azure 服务总线参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html

其他人参考: https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html

2:用Service Bus完全替换Celery 满足您的要求:

考虑

  • 消息发送者是生产者
  • 消息接收者是消费者

这是您必须处理的两个不同的应用程序。

您可以参考以下内容以获取更多示例代码。

https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples

解释:

  • 每次你想执行的动作,你可以发送 从生产者客户端到主题的消息。
  • 消费者客户端 - 正在侦听的应用程序将接收消息并进行处理。您可以将您的自定义流程附加到它 - 这样,只要在消费者客户端收到消息,您的自定义流程就会执行。

接收客户端示例如下:

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

        
Receiving = True

#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()

async def receive_message_from1():
    await receiver.open()
    print("Opening the Receiver for Topic1")
    async with receiver:
      while(Receiving):
        msgs =  await receiver.fetch_next()
        for m in msgs:
            print("Received the message from topic 1.....")
            ##### - Your code to execute when a message is received - ########
            print(str(m))
            ##### - Your code to execute when a message is received - ########
            await m.complete()
            
            
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())

下行之间的部分是每次收到消息时都会执行的指令。

##### - Your code to execute when a message is received - ########