Python 异步队列未更新

Python asyncio queue not updated

以下代码实例化一个 asyncio.Queue 对象,并尝试从两个不同的协程填充和使用该队列,分别为 arrival()server()

loop = asyncio.get_event_loop()
q = asyncio.Queue()

async def arrival(q):
    print('ARRIVAL - Queue id:', id(q))

    while True:
        await asyncio.sleep(1)
        item = random.choice(['item1', 'item2'..., 'item100'])
        q.put(item)

        print('ARRIVAL - added {}, qsize is now {}'.format(item, q.qsize()))


async def server(q):
    print('SERVER - Queue id:', id(q))

    while True:
        item = await q.get()
        print('SERVER - taking {}, qsize is now {}'.format(item, q.qsize()))

        await asyncio.sleep(1.8)
        print('SERVER - finished processing {}'.format(item))


tasks = [loop.create_task(arrival(q)), loop.create_task(server(q))]
loop.run_until_complete(asyncio.gather(*tasks))

原理如下:

预期输出为:

SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 1
SERVER - taking item1, qsize is now 0
ARRIVAL - added item2, qsize is now 1
SERVER - finished processing item1
SERVER - taking item2, qsize is now 0
ARRIVAL - added item3, qsize is now 1
ARRIVAL - added item4, qsize is now 2
SERVER - finished processing item2
SERVER - taking item3, qsize is now 1
ARRIVAL - added item5, qsize is now 2
ARRIVAL - added item6, qsize is now 3
SERVER - finished processing item3
SERVER - taking item4, qsize is now 2

但是当我运行上面的代码时,server()中的while True循环中的元素从不执行,q.qsize()始终为0,输出为:

SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 0
ARRIVAL - added item2, qsize is now 0
ARRIVAL - added item3, qsize is now 0
ARRIVAL - added item4, qsize is now 0
ARRIVAL - added item5, qsize is now 0
...

似乎 q 对象从未被 arrival() 更新(q.qsize() 始终为 0)因此 server() 永远不知道 arrival().

我按照你想要的方式得到了这个运行:

import asyncio
import random

random.seed(31415)  # get reproducible runs

ITEMS = ['item{}'.format(i) for i in range(100)]

async def arrival(q):
    queue_object_id = id(q)
    print('ARRIVAL - Queue id:', queue_object_id)
    while True:
        await asyncio.sleep(1)
        item = random.choice(ITEMS)
        await q.put(item)
        size = q.qsize()
        print('ARRIVAL - added {}, qsize is now {}'.format(item, size))

async def server(q):
    queue_object_id = id(q)
    print('SERVER - Queue id:', queue_object_id)

    while True:
        item = await q.get()
        size = q.qsize()
        print('SERVER - taking {}, qsize is now {}'.format(item, size))
        await asyncio.sleep(1.8)
        print('SERVER - finished processing {}'.format(item))

loop = asyncio.get_event_loop()
q = asyncio.Queue()
cors = asyncio.wait([arrival(q), server(q)])
loop.run_until_complete(cors)

不幸的是,我没有跟踪我必须进行的所有更改...抱歉。但我相信您会发现其中的差异以及它们为何会有所不同。

这会产生输出:

SERVER - Queue id: 140540011741592
ARRIVAL - Queue id: 140540011741592
ARRIVAL - added item75, qsize is now 1
SERVER - taking item75, qsize is now 0
ARRIVAL - added item36, qsize is now 1
SERVER - finished processing item75
SERVER - taking item36, qsize is now 0
ARRIVAL - added item57, qsize is now 1
ARRIVAL - added item5, qsize is now 2
SERVER - finished processing item36
SERVER - taking item57, qsize is now 1
ARRIVAL - added item69, qsize is now 2
ARRIVAL - added item67, qsize is now 3
SERVER - finished processing item57
SERVER - taking item5, qsize is now 2
ARRIVAL - added item53, qsize is now 3
ARRIVAL - added item16, qsize is now 4
SERVER - finished processing item5
SERVER - taking item69, qsize is now 3
ARRIVAL - added item91, qsize is now 4
...

TL&DR:确保在使用异步队列时同时等待 put() 和 get()。

理性:asyncio队列中的get()和put()函数都是协程,需要等待。

示例

await q.put(item)
await q.get(item)