为什么从 asyncio.Queue 收到的消息不保持它们发送的顺序?

Why messages received from an asyncio.Queue don't keep the order they were sent in?

我正在努力保持从 asyncio.Queue 收到的消息的正确顺序。

在我的例子中,多个发布者(put 向队列发送消息)通过单个 asyncio.Queue 将消息发送给消费者(get 来自队列的消息) .在下面的示例代码中,consumer.process 中的断言不成立,即违反了接收消息的顺序:

import asyncio
import random


async def publish(publisher_id, consumer_queue, messages):
    assert messages == sorted(messages)  # sent messages are ordered
    for message in messages:
        if random.random() < 0.1:  # simulate publisher taking a short break
            await asyncio.sleep(random.random())
        # publish message to queue
        asyncio.create_task(consumer_queue.put((publisher_id, message)))
    # signal that this publisher is finished
    asyncio.create_task(consumer_queue.put((publisher_id, None)))


class Consumer:
    def __init__(self, num_publishers):
        self.publishers = {publisher: [] for publisher in range(num_publishers)}
        self.queue = asyncio.Queue(maxsize=1)
        self.finished = asyncio.Event()

    async def consume_loop(self):
        while True:
            publisher_id, message = await self.queue.get()
            # delegate processing of the message
            asyncio.create_task(self.process(publisher_id, message))

    async def process(self, publisher_id, message):
        if message is None:  # publisher is finished
            received = self.publishers.pop(publisher_id)
            # assert that if messages sent by a publisher are ordered,
            # then the same messages received by the consumer are also ordered
            assert received == sorted(received), f'{publisher_id}: {received}'
            if not self.publishers:
                # all publishers are finished
                self.finished.set()
        else:
            # assert that the publisher is still publishing
            assert publisher_id in self.publishers, publisher_id
            self.publishers[publisher_id].append(message)


async def main():
    num_publishers = 100
    consumer = Consumer(num_publishers)
    # consumer begins listening for new messages
    asyncio.create_task(consumer.consume_loop())
    # create publishers that send messages to the consumer
    for publisher_id in range(num_publishers):
        messages = list(range(100))
        asyncio.create_task(publish(publisher_id, consumer.queue, messages))
    # wait for the consumer to receive all messages
    await consumer.finished.wait()

if __name__ == '__main__':
    asyncio.run(main())

观察:

问题(重要的加粗):

asyncio.Queue 是先进先出,这意味着项目按照它们入队的相同顺序出队。您的代码的问题在于,消费者不等待处理代码,而是在后台将其生成到 运行 。此外,生产者不会等待入队,但 运行 也在后台等待。

结果是没有施加背压,因为虽然队列在技术上是有界的,但项目一出现就会出队,并且它们被转移到事件内部维护的(无界)任务队列环形。此外,您会丢失顺序,因为 create_task() 不能保证按特定顺序 运行。

要解决此问题,您应该:

  • 让你的队列无界,
  • asyncio.create_task(consumer_queue.put(...)) 更改为 await consumer_queue.put(...),并且
  • asyncio.create_task(self.process(...)) 更改为 await self.process(...)

如果您决定确实需要背压,请限制通道。