为什么从 asyncio.Queue 收到的消息不保持它们发送的顺序?
Why messages received from an asyncio.Queue don't keep the order they were sent in?
我正在努力保持从 asyncio.Queue
收到的消息的正确顺序。
在我的例子中,多个发布者(put
向队列发送消息)通过单个 asyncio.Queue
将消息发送给消费者(get
来自队列的消息) .在下面的示例代码中,consumer.process
中的断言不成立,即违反了接收消息的顺序:
import asyncio
import random
async def publish(publisher_id, consumer_queue, messages):
assert messages == sorted(messages) # sent messages are ordered
for message in messages:
if random.random() < 0.1: # simulate publisher taking a short break
await asyncio.sleep(random.random())
# publish message to queue
asyncio.create_task(consumer_queue.put((publisher_id, message)))
# signal that this publisher is finished
asyncio.create_task(consumer_queue.put((publisher_id, None)))
class Consumer:
def __init__(self, num_publishers):
self.publishers = {publisher: [] for publisher in range(num_publishers)}
self.queue = asyncio.Queue(maxsize=1)
self.finished = asyncio.Event()
async def consume_loop(self):
while True:
publisher_id, message = await self.queue.get()
# delegate processing of the message
asyncio.create_task(self.process(publisher_id, message))
async def process(self, publisher_id, message):
if message is None: # publisher is finished
received = self.publishers.pop(publisher_id)
# assert that if messages sent by a publisher are ordered,
# then the same messages received by the consumer are also ordered
assert received == sorted(received), f'{publisher_id}: {received}'
if not self.publishers:
# all publishers are finished
self.finished.set()
else:
# assert that the publisher is still publishing
assert publisher_id in self.publishers, publisher_id
self.publishers[publisher_id].append(message)
async def main():
num_publishers = 100
consumer = Consumer(num_publishers)
# consumer begins listening for new messages
asyncio.create_task(consumer.consume_loop())
# create publishers that send messages to the consumer
for publisher_id in range(num_publishers):
messages = list(range(100))
asyncio.create_task(publish(publisher_id, consumer.queue, messages))
# wait for the consumer to receive all messages
await consumer.finished.wait()
if __name__ == '__main__':
asyncio.run(main())
观察:
- 如果发布者不休息,断言就会成立。但是,在我的用例中,发布者不会立即发布所有消息,而是偶尔发布消息。
- 如果使用无界队列而不是有界队列,则断言成立。但是,我希望对队列进行限制以对发布者施加一些背压。
- 所有发布商的顺序都没有中断。
- 越publishers/messages,断言越有可能被破坏。
- 即使
consumer.process
不是异步的,断言也会中断。
问题(重要的加粗):
asyncio.Queue
代码中的消息排序是否有问题?
- 我认为在我的情况下,
asyncio.Queue
也应该对每个发布者按照 FIFO 原则运作吗?
- 此行为是否与
asyncio
代码中由于创建的任务数量而导致的一些排序问题有关?我认为不是,因为无界队列工作正常。
- 如何在保持队列有界的同时保持顺序?
- 如果问题是许多发布者使用同一个队列,是否有适用于多个发布者的队列实现?
asyncio.Queue
是先进先出,这意味着项目按照它们入队的相同顺序出队。您的代码的问题在于,消费者不等待处理代码,而是在后台将其生成到 运行 。此外,生产者不会等待入队,但 运行 也在后台等待。
结果是没有施加背压,因为虽然队列在技术上是有界的,但项目一出现就会出队,并且它们被转移到事件内部维护的(无界)任务队列环形。此外,您会丢失顺序,因为 create_task()
不能保证按特定顺序 运行。
要解决此问题,您应该:
- 让你的队列无界,
- 将
asyncio.create_task(consumer_queue.put(...))
更改为 await consumer_queue.put(...)
,并且
- 将
asyncio.create_task(self.process(...))
更改为 await self.process(...)
。
如果您决定确实需要背压,请限制通道。
我正在努力保持从 asyncio.Queue
收到的消息的正确顺序。
在我的例子中,多个发布者(put
向队列发送消息)通过单个 asyncio.Queue
将消息发送给消费者(get
来自队列的消息) .在下面的示例代码中,consumer.process
中的断言不成立,即违反了接收消息的顺序:
import asyncio
import random
async def publish(publisher_id, consumer_queue, messages):
assert messages == sorted(messages) # sent messages are ordered
for message in messages:
if random.random() < 0.1: # simulate publisher taking a short break
await asyncio.sleep(random.random())
# publish message to queue
asyncio.create_task(consumer_queue.put((publisher_id, message)))
# signal that this publisher is finished
asyncio.create_task(consumer_queue.put((publisher_id, None)))
class Consumer:
def __init__(self, num_publishers):
self.publishers = {publisher: [] for publisher in range(num_publishers)}
self.queue = asyncio.Queue(maxsize=1)
self.finished = asyncio.Event()
async def consume_loop(self):
while True:
publisher_id, message = await self.queue.get()
# delegate processing of the message
asyncio.create_task(self.process(publisher_id, message))
async def process(self, publisher_id, message):
if message is None: # publisher is finished
received = self.publishers.pop(publisher_id)
# assert that if messages sent by a publisher are ordered,
# then the same messages received by the consumer are also ordered
assert received == sorted(received), f'{publisher_id}: {received}'
if not self.publishers:
# all publishers are finished
self.finished.set()
else:
# assert that the publisher is still publishing
assert publisher_id in self.publishers, publisher_id
self.publishers[publisher_id].append(message)
async def main():
num_publishers = 100
consumer = Consumer(num_publishers)
# consumer begins listening for new messages
asyncio.create_task(consumer.consume_loop())
# create publishers that send messages to the consumer
for publisher_id in range(num_publishers):
messages = list(range(100))
asyncio.create_task(publish(publisher_id, consumer.queue, messages))
# wait for the consumer to receive all messages
await consumer.finished.wait()
if __name__ == '__main__':
asyncio.run(main())
观察:
- 如果发布者不休息,断言就会成立。但是,在我的用例中,发布者不会立即发布所有消息,而是偶尔发布消息。
- 如果使用无界队列而不是有界队列,则断言成立。但是,我希望对队列进行限制以对发布者施加一些背压。
- 所有发布商的顺序都没有中断。
- 越publishers/messages,断言越有可能被破坏。
- 即使
consumer.process
不是异步的,断言也会中断。
问题(重要的加粗):
asyncio.Queue
代码中的消息排序是否有问题?- 我认为在我的情况下,
asyncio.Queue
也应该对每个发布者按照 FIFO 原则运作吗? - 此行为是否与
asyncio
代码中由于创建的任务数量而导致的一些排序问题有关?我认为不是,因为无界队列工作正常。 - 如何在保持队列有界的同时保持顺序?
- 如果问题是许多发布者使用同一个队列,是否有适用于多个发布者的队列实现?
asyncio.Queue
是先进先出,这意味着项目按照它们入队的相同顺序出队。您的代码的问题在于,消费者不等待处理代码,而是在后台将其生成到 运行 。此外,生产者不会等待入队,但 运行 也在后台等待。
结果是没有施加背压,因为虽然队列在技术上是有界的,但项目一出现就会出队,并且它们被转移到事件内部维护的(无界)任务队列环形。此外,您会丢失顺序,因为 create_task()
不能保证按特定顺序 运行。
要解决此问题,您应该:
- 让你的队列无界,
- 将
asyncio.create_task(consumer_queue.put(...))
更改为await consumer_queue.put(...)
,并且 - 将
asyncio.create_task(self.process(...))
更改为await self.process(...)
。
如果您决定确实需要背压,请限制通道。