Asyncio: Fastapi with aio-pika, consumer 忽略 Await
Asyncio: Fastapi with aio-pika, consumer ignores Await
我正在尝试使用 rabbitmq (aio-pika) 连接我的 websocket 端点。目标是让该端点中的侦听器和队列中的任何新消息通过 websocket 将消息传递给浏览器客户端。
我在带有 asyncio 循环的脚本中使用 asyncio 测试了消费者。按照我遵循并使用 aio-pika 文档的方式工作。 (来源:https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/2-work-queues.html、worker.py)
但是,当我在 websockets 端点的 fastapi 中使用它时,我无法让它工作。不知何故听众:
await queue.consume(on_message)
完全忽略。
这是我的尝试(我把它全部放在一个函数中,所以它更具可读性):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
print("Entering websockets")
await manager.connect(websocket)
print("got connection")
# params
queue_name = "task_events"
routing_key = "user_id.task"
con = "amqp://rabbitmq:rabbitmq@rabbit:5672/"
connection = await connect(con)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(
"topic_logs",
ExchangeType.TOPIC,
)
# Declaring queue
queue = await channel.declare_queue(queue_name)
# Binding the queue to the exchange
await queue.bind(exchange, routing_key)
async def on_message(message: IncomingMessage):
async with message.process():
# here will be the message passed over websockets to browser client
print("sent", message.body)
try:
######### Not working as expected ###########
# await does not await and websockets finishes, as there is no loop
await queue.consume(on_message)
#############################################
################ This Alternative code atleast receives some messages #############
# If I use this part, I atleast get some messages, when I trigger a backend task that publishes new messages to the queue.
# It seems like the messages are somehow stuck and new task releases all stucked messages, but does not release new one.
while True:
await queue.consume(on_message)
await asyncio.sleep(1)
################## one part #############
except WebSocketDisconnect:
manager.disconnect(websocket)
我对 python 中的异步还很陌生。我不确定问题出在哪里,并且在从 aio-pika 获得 worker.py 的启发时,我无法以某种方式实现异步消费循环。
解决方法很简单。
aio-pika queue.consume 尽管我们使用 await 是非阻塞的,所以
这样我们消费
consumer_tag = await queue.consume(on_message, no_ack=True)
并在连接结束时取消
await queue.cancel(consumer_tag)
对我来说解决方案的核心是做一些异步阻塞,所以我用了
consume
后的这部分代码
while True:
data = await websocket.receive_text()
x = await manager.send_message(data, websocket)
我不使用这段代码,但它很有用,因为这部分代码等待前端 websocket 响应。如果缺少这部分代码,那么客户端连接只是为了断开连接(websocket 端点已成功执行),因为没有任何阻塞
您可以使用异步迭代器,这是从队列中使用消息的第二种规范方式。
在您的情况下,这意味着:
async with queue.iterator() as iter:
async for message in iter:
async with message.process():
# do something with message
只要没有收到消息就会阻塞,处理完消息会再次挂起
我正在尝试使用 rabbitmq (aio-pika) 连接我的 websocket 端点。目标是让该端点中的侦听器和队列中的任何新消息通过 websocket 将消息传递给浏览器客户端。
我在带有 asyncio 循环的脚本中使用 asyncio 测试了消费者。按照我遵循并使用 aio-pika 文档的方式工作。 (来源:https://aio-pika.readthedocs.io/en/latest/rabbitmq-tutorial/2-work-queues.html、worker.py)
但是,当我在 websockets 端点的 fastapi 中使用它时,我无法让它工作。不知何故听众:
await queue.consume(on_message)
完全忽略。
这是我的尝试(我把它全部放在一个函数中,所以它更具可读性):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
print("Entering websockets")
await manager.connect(websocket)
print("got connection")
# params
queue_name = "task_events"
routing_key = "user_id.task"
con = "amqp://rabbitmq:rabbitmq@rabbit:5672/"
connection = await connect(con)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(
"topic_logs",
ExchangeType.TOPIC,
)
# Declaring queue
queue = await channel.declare_queue(queue_name)
# Binding the queue to the exchange
await queue.bind(exchange, routing_key)
async def on_message(message: IncomingMessage):
async with message.process():
# here will be the message passed over websockets to browser client
print("sent", message.body)
try:
######### Not working as expected ###########
# await does not await and websockets finishes, as there is no loop
await queue.consume(on_message)
#############################################
################ This Alternative code atleast receives some messages #############
# If I use this part, I atleast get some messages, when I trigger a backend task that publishes new messages to the queue.
# It seems like the messages are somehow stuck and new task releases all stucked messages, but does not release new one.
while True:
await queue.consume(on_message)
await asyncio.sleep(1)
################## one part #############
except WebSocketDisconnect:
manager.disconnect(websocket)
我对 python 中的异步还很陌生。我不确定问题出在哪里,并且在从 aio-pika 获得 worker.py 的启发时,我无法以某种方式实现异步消费循环。
解决方法很简单。
aio-pika queue.consume 尽管我们使用 await 是非阻塞的,所以 这样我们消费
consumer_tag = await queue.consume(on_message, no_ack=True)
并在连接结束时取消
await queue.cancel(consumer_tag)
对我来说解决方案的核心是做一些异步阻塞,所以我用了 consume
后的这部分代码while True:
data = await websocket.receive_text()
x = await manager.send_message(data, websocket)
我不使用这段代码,但它很有用,因为这部分代码等待前端 websocket 响应。如果缺少这部分代码,那么客户端连接只是为了断开连接(websocket 端点已成功执行),因为没有任何阻塞
您可以使用异步迭代器,这是从队列中使用消息的第二种规范方式。
在您的情况下,这意味着:
async with queue.iterator() as iter:
async for message in iter:
async with message.process():
# do something with message
只要没有收到消息就会阻塞,处理完消息会再次挂起