Python 异步队列未更新
Python asyncio queue not updated
以下代码实例化一个 asyncio.Queue
对象,并尝试从两个不同的协程填充和使用该队列,分别为 arrival()
和 server()
。
loop = asyncio.get_event_loop()
q = asyncio.Queue()
async def arrival(q):
print('ARRIVAL - Queue id:', id(q))
while True:
await asyncio.sleep(1)
item = random.choice(['item1', 'item2'..., 'item100'])
q.put(item)
print('ARRIVAL - added {}, qsize is now {}'.format(item, q.qsize()))
async def server(q):
print('SERVER - Queue id:', id(q))
while True:
item = await q.get()
print('SERVER - taking {}, qsize is now {}'.format(item, q.qsize()))
await asyncio.sleep(1.8)
print('SERVER - finished processing {}'.format(item))
tasks = [loop.create_task(arrival(q)), loop.create_task(server(q))]
loop.run_until_complete(asyncio.gather(*tasks))
原理如下:
- 每 1 秒,一个项目添加到
q
- 只要服务器空闲,它就会获取队列中的下一个项目或等待它
- 服务器处理一个项目需要 1.8 秒
预期输出为:
SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 1
SERVER - taking item1, qsize is now 0
ARRIVAL - added item2, qsize is now 1
SERVER - finished processing item1
SERVER - taking item2, qsize is now 0
ARRIVAL - added item3, qsize is now 1
ARRIVAL - added item4, qsize is now 2
SERVER - finished processing item2
SERVER - taking item3, qsize is now 1
ARRIVAL - added item5, qsize is now 2
ARRIVAL - added item6, qsize is now 3
SERVER - finished processing item3
SERVER - taking item4, qsize is now 2
但是当我运行上面的代码时,server()
中的while True
循环中的元素从不执行,q.qsize()
始终为0,输出为:
SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 0
ARRIVAL - added item2, qsize is now 0
ARRIVAL - added item3, qsize is now 0
ARRIVAL - added item4, qsize is now 0
ARRIVAL - added item5, qsize is now 0
...
似乎 q
对象从未被 arrival()
更新(q.qsize()
始终为 0)因此 server()
永远不知道 arrival()
.
我按照你想要的方式得到了这个运行:
import asyncio
import random
random.seed(31415) # get reproducible runs
ITEMS = ['item{}'.format(i) for i in range(100)]
async def arrival(q):
queue_object_id = id(q)
print('ARRIVAL - Queue id:', queue_object_id)
while True:
await asyncio.sleep(1)
item = random.choice(ITEMS)
await q.put(item)
size = q.qsize()
print('ARRIVAL - added {}, qsize is now {}'.format(item, size))
async def server(q):
queue_object_id = id(q)
print('SERVER - Queue id:', queue_object_id)
while True:
item = await q.get()
size = q.qsize()
print('SERVER - taking {}, qsize is now {}'.format(item, size))
await asyncio.sleep(1.8)
print('SERVER - finished processing {}'.format(item))
loop = asyncio.get_event_loop()
q = asyncio.Queue()
cors = asyncio.wait([arrival(q), server(q)])
loop.run_until_complete(cors)
不幸的是,我没有跟踪我必须进行的所有更改...抱歉。但我相信您会发现其中的差异以及它们为何会有所不同。
这会产生输出:
SERVER - Queue id: 140540011741592
ARRIVAL - Queue id: 140540011741592
ARRIVAL - added item75, qsize is now 1
SERVER - taking item75, qsize is now 0
ARRIVAL - added item36, qsize is now 1
SERVER - finished processing item75
SERVER - taking item36, qsize is now 0
ARRIVAL - added item57, qsize is now 1
ARRIVAL - added item5, qsize is now 2
SERVER - finished processing item36
SERVER - taking item57, qsize is now 1
ARRIVAL - added item69, qsize is now 2
ARRIVAL - added item67, qsize is now 3
SERVER - finished processing item57
SERVER - taking item5, qsize is now 2
ARRIVAL - added item53, qsize is now 3
ARRIVAL - added item16, qsize is now 4
SERVER - finished processing item5
SERVER - taking item69, qsize is now 3
ARRIVAL - added item91, qsize is now 4
...
TL&DR:确保在使用异步队列时同时等待 put() 和 get()。
理性:asyncio队列中的get()和put()函数都是协程,需要等待。
示例:
await q.put(item)
await q.get(item)
以下代码实例化一个 asyncio.Queue
对象,并尝试从两个不同的协程填充和使用该队列,分别为 arrival()
和 server()
。
loop = asyncio.get_event_loop()
q = asyncio.Queue()
async def arrival(q):
print('ARRIVAL - Queue id:', id(q))
while True:
await asyncio.sleep(1)
item = random.choice(['item1', 'item2'..., 'item100'])
q.put(item)
print('ARRIVAL - added {}, qsize is now {}'.format(item, q.qsize()))
async def server(q):
print('SERVER - Queue id:', id(q))
while True:
item = await q.get()
print('SERVER - taking {}, qsize is now {}'.format(item, q.qsize()))
await asyncio.sleep(1.8)
print('SERVER - finished processing {}'.format(item))
tasks = [loop.create_task(arrival(q)), loop.create_task(server(q))]
loop.run_until_complete(asyncio.gather(*tasks))
原理如下:
- 每 1 秒,一个项目添加到
q
- 只要服务器空闲,它就会获取队列中的下一个项目或等待它
- 服务器处理一个项目需要 1.8 秒
预期输出为:
SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 1
SERVER - taking item1, qsize is now 0
ARRIVAL - added item2, qsize is now 1
SERVER - finished processing item1
SERVER - taking item2, qsize is now 0
ARRIVAL - added item3, qsize is now 1
ARRIVAL - added item4, qsize is now 2
SERVER - finished processing item2
SERVER - taking item3, qsize is now 1
ARRIVAL - added item5, qsize is now 2
ARRIVAL - added item6, qsize is now 3
SERVER - finished processing item3
SERVER - taking item4, qsize is now 2
但是当我运行上面的代码时,server()
中的while True
循环中的元素从不执行,q.qsize()
始终为0,输出为:
SERVER - Queue id: 12345678
ARRIVAL - Queue id: 12345678
ARRIVAL - added item1, qsize is now 0
ARRIVAL - added item2, qsize is now 0
ARRIVAL - added item3, qsize is now 0
ARRIVAL - added item4, qsize is now 0
ARRIVAL - added item5, qsize is now 0
...
似乎 q
对象从未被 arrival()
更新(q.qsize()
始终为 0)因此 server()
永远不知道 arrival()
.
我按照你想要的方式得到了这个运行:
import asyncio
import random
random.seed(31415) # get reproducible runs
ITEMS = ['item{}'.format(i) for i in range(100)]
async def arrival(q):
queue_object_id = id(q)
print('ARRIVAL - Queue id:', queue_object_id)
while True:
await asyncio.sleep(1)
item = random.choice(ITEMS)
await q.put(item)
size = q.qsize()
print('ARRIVAL - added {}, qsize is now {}'.format(item, size))
async def server(q):
queue_object_id = id(q)
print('SERVER - Queue id:', queue_object_id)
while True:
item = await q.get()
size = q.qsize()
print('SERVER - taking {}, qsize is now {}'.format(item, size))
await asyncio.sleep(1.8)
print('SERVER - finished processing {}'.format(item))
loop = asyncio.get_event_loop()
q = asyncio.Queue()
cors = asyncio.wait([arrival(q), server(q)])
loop.run_until_complete(cors)
不幸的是,我没有跟踪我必须进行的所有更改...抱歉。但我相信您会发现其中的差异以及它们为何会有所不同。
这会产生输出:
SERVER - Queue id: 140540011741592
ARRIVAL - Queue id: 140540011741592
ARRIVAL - added item75, qsize is now 1
SERVER - taking item75, qsize is now 0
ARRIVAL - added item36, qsize is now 1
SERVER - finished processing item75
SERVER - taking item36, qsize is now 0
ARRIVAL - added item57, qsize is now 1
ARRIVAL - added item5, qsize is now 2
SERVER - finished processing item36
SERVER - taking item57, qsize is now 1
ARRIVAL - added item69, qsize is now 2
ARRIVAL - added item67, qsize is now 3
SERVER - finished processing item57
SERVER - taking item5, qsize is now 2
ARRIVAL - added item53, qsize is now 3
ARRIVAL - added item16, qsize is now 4
SERVER - finished processing item5
SERVER - taking item69, qsize is now 3
ARRIVAL - added item91, qsize is now 4
...
TL&DR:确保在使用异步队列时同时等待 put() 和 get()。
理性:asyncio队列中的get()和put()函数都是协程,需要等待。
示例:
await q.put(item)
await q.get(item)