python asyncio 在键可用时通过键从字典中异步获取数据
python asyncio asynchronously fetch data by key from a dict when the key becomes available
如标题所述,我的用例是这样的:
我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我会为它生成一个唯一的请求 ID,然后我将 {req_id: req_pyaload}
dict 发送给一些工人(工人不是在 python 中 运行 在另一个过程中),当工人们完成工作时,我得到响应并将它们放入这样的结果字典中:{req_id_1: res_1, req_id_2: res_2}
.
然后我希望我的 aiohttp 服务器处理程序在 result dict
之上 await
,因此当特定响应可用时(通过 req_id)它可以将其发回。
我构建了下面的示例代码来尝试模拟该过程,但在实现协程 async def fetch_correct_res(req_id)
时卡住了,它应该 asynchronously/unblockly 获取正确的响应通过 req_id
.
import random
import asyncio
import shortuuid
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id):
pass
async def handler(req):
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
for _ in range(n_tests):
random_idx = random.choice(idxs)
await asyncio.sleep(random_idx / 1000)
res_dict[req_ids[random_idx]] = req_ids[random_idx]
print("req: {} is back".format(req_ids[random_idx]))
所以:
这个解决方案是否可行?怎么样?
如果上述解决方案不可行,对于使用 asyncio 的这个用例,正确的解决方案应该是什么?
非常感谢。
目前我能想到的唯一方法是:pre-created 一些 asyncio.Queue
和 pre-assigned id,然后为每个传入请求分配一个 queue 到它,所以处理程序只是 await
在这个 queue 上,当响应返回时我把它放到这个 pre-assigned queue 中,在请求完成后,我收集返回 queue 以将其用于下一个传入请求。不是很优雅,但会解决问题。
这可能有效(注意:我删除了 UUID 以便提前知道请求 ID)
import random
import asyncio
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for i in range(n_tests):
req_ids.append(i)
res_dict = {}
async def fetch_correct_res(req_id):
while not res_dict.get(req_id):
await asyncio.sleep(0.1)
return req_ids[req_id]
async def handler(req):
print("fetching req: ", req)
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict(future):
for i in range(n_tests):
res_dict[req_ids[i]] = req_ids[i]
await asyncio.sleep(0.5)
print("req: {} is back".format(req_ids[i]))
future.set_result("done")
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(randomly_put_res_to_res_dict(future))
loop.run_until_complete(handler(10))
loop.close()
这是最好的解决方案吗?根据我的说法 不,基本上它是一种请求长 运行 工作状态的类型,你应该有 (REST) api 来完成工作提交并了解工作状态,例如:
http POST server:port/job
{some job json paylod}
Response: 200 OK {"req_id": 1}
http GET server:port/job/1
Response: 200 OK {"req_id": 1, "status": "in process"}
http GET server:port/job/1
Response: 200 OK {"req_id": 1, "status": "done", "result":{}}
看看下面的示例实现是否满足您的需求
基本上你想用你的响应(无法预测顺序)以异步方式响应请求(id)
因此在请求处理时,在 asyncio.Event.wait()
上用 {request_id: {'event':<async.Event>, 'result': <result>}}
和 await
填充字典,一旦收到响应,用 asyncio.Event.set()
发出事件信号这将释放等待,然后根据请求 id
从字典中获取响应
我稍微修改了您的代码以使用请求 ID 预填充 dict 并将 await
放在 asyncio.Event.wait()
上直到信号来自响应
import random
import asyncio
import shortuuid
n_tests = 10
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id, event):
await event.wait()
res = res_dict[req_id]['result']
return res
async def handler(req, loop):
print("incoming request id: {}".format(req))
event = asyncio.Event()
data = {req :{}}
res_dict.update(data)
res_dict[req]['event']=event
res_dict[req]['result']='pending'
res = await fetch_correct_res(req, event)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
random.shuffle(req_ids)
for i in req_ids:
await asyncio.sleep(random.randrange(2,4))
print("req: {} is back".format(i))
if res_dict.get(i) is not None:
event = res_dict[i]['event']
res_dict[i]['result'] = i
event.set()
loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
handler(req_ids[1], loop),
handler(req_ids[2], loop),
handler(req_ids[3], loop),
randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()
上述代码的示例响应
incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh
如标题所述,我的用例是这样的:
我有一个 aiohttp 服务器,它接受来自客户端的请求,当我收到请求时,我会为它生成一个唯一的请求 ID,然后我将 {req_id: req_pyaload}
dict 发送给一些工人(工人不是在 python 中 运行 在另一个过程中),当工人们完成工作时,我得到响应并将它们放入这样的结果字典中:{req_id_1: res_1, req_id_2: res_2}
.
然后我希望我的 aiohttp 服务器处理程序在 result dict
之上 await
,因此当特定响应可用时(通过 req_id)它可以将其发回。
我构建了下面的示例代码来尝试模拟该过程,但在实现协程 async def fetch_correct_res(req_id)
时卡住了,它应该 asynchronously/unblockly 获取正确的响应通过 req_id
.
import random
import asyncio
import shortuuid
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id):
pass
async def handler(req):
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
for _ in range(n_tests):
random_idx = random.choice(idxs)
await asyncio.sleep(random_idx / 1000)
res_dict[req_ids[random_idx]] = req_ids[random_idx]
print("req: {} is back".format(req_ids[random_idx]))
所以:
这个解决方案是否可行?怎么样?
如果上述解决方案不可行,对于使用 asyncio 的这个用例,正确的解决方案应该是什么?
非常感谢。
目前我能想到的唯一方法是:pre-created 一些 asyncio.Queue
和 pre-assigned id,然后为每个传入请求分配一个 queue 到它,所以处理程序只是 await
在这个 queue 上,当响应返回时我把它放到这个 pre-assigned queue 中,在请求完成后,我收集返回 queue 以将其用于下一个传入请求。不是很优雅,但会解决问题。
这可能有效(注意:我删除了 UUID 以便提前知道请求 ID)
import random
import asyncio
n_tests = 1000
idxs = list(range(n_tests))
req_ids = []
for i in range(n_tests):
req_ids.append(i)
res_dict = {}
async def fetch_correct_res(req_id):
while not res_dict.get(req_id):
await asyncio.sleep(0.1)
return req_ids[req_id]
async def handler(req):
print("fetching req: ", req)
res = await fetch_correct_res(req)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict(future):
for i in range(n_tests):
res_dict[req_ids[i]] = req_ids[i]
await asyncio.sleep(0.5)
print("req: {} is back".format(req_ids[i]))
future.set_result("done")
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(randomly_put_res_to_res_dict(future))
loop.run_until_complete(handler(10))
loop.close()
这是最好的解决方案吗?根据我的说法 不,基本上它是一种请求长 运行 工作状态的类型,你应该有 (REST) api 来完成工作提交并了解工作状态,例如:
http POST server:port/job
{some job json paylod}
Response: 200 OK {"req_id": 1}
http GET server:port/job/1
Response: 200 OK {"req_id": 1, "status": "in process"}
http GET server:port/job/1
Response: 200 OK {"req_id": 1, "status": "done", "result":{}}
看看下面的示例实现是否满足您的需求
基本上你想用你的响应(无法预测顺序)以异步方式响应请求(id)
因此在请求处理时,在 asyncio.Event.wait()
上用 {request_id: {'event':<async.Event>, 'result': <result>}}
和 await
填充字典,一旦收到响应,用 asyncio.Event.set()
发出事件信号这将释放等待,然后根据请求 id
我稍微修改了您的代码以使用请求 ID 预填充 dict 并将 await
放在 asyncio.Event.wait()
上直到信号来自响应
import random
import asyncio
import shortuuid
n_tests = 10
idxs = list(range(n_tests))
req_ids = []
for _ in range(n_tests):
req_ids.append(shortuuid.uuid())
res_dict = {}
async def fetch_correct_res(req_id, event):
await event.wait()
res = res_dict[req_id]['result']
return res
async def handler(req, loop):
print("incoming request id: {}".format(req))
event = asyncio.Event()
data = {req :{}}
res_dict.update(data)
res_dict[req]['event']=event
res_dict[req]['result']='pending'
res = await fetch_correct_res(req, event)
assert req == res, "the correct res for the req should exactly be the req itself."
print("got correct res for req: {}".format(req))
async def randomly_put_res_to_res_dict():
random.shuffle(req_ids)
for i in req_ids:
await asyncio.sleep(random.randrange(2,4))
print("req: {} is back".format(i))
if res_dict.get(i) is not None:
event = res_dict[i]['event']
res_dict[i]['result'] = i
event.set()
loop = asyncio.get_event_loop()
tasks = asyncio.gather(handler(req_ids[0], loop),
handler(req_ids[1], loop),
handler(req_ids[2], loop),
handler(req_ids[3], loop),
randomly_put_res_to_res_dict())
loop.run_until_complete(tasks)
loop.close()
上述代码的示例响应
incoming request id: NDhvBPqMiRbteFD5WqiLFE
incoming request id: fpmk8yC3iQcgHAJBKqe2zh
incoming request id: M7eX7qeVQfWCCBnP4FbRtK
incoming request id: v2hAfcCEhRPUDUjCabk45N
req: VeyvAEX7YGgRZDHqa2UGYc is back
req: M7eX7qeVQfWCCBnP4FbRtK is back
got correct res for req: M7eX7qeVQfWCCBnP4FbRtK
req: pVvYoyAzvK8VYaHfrFA9SB is back
req: soP8NDxeQKYjgeT7pa3wtG is back
req: j3rcg5Lp59pQXuvdjCAyZe is back
req: NDhvBPqMiRbteFD5WqiLFE is back
got correct res for req: NDhvBPqMiRbteFD5WqiLFE
req: v2hAfcCEhRPUDUjCabk45N is back
got correct res for req: v2hAfcCEhRPUDUjCabk45N
req: porzHqMqV8SAuttteHRwNL is back
req: trVVqZrUpsW3tfjQajJfb7 is back
req: fpmk8yC3iQcgHAJBKqe2zh is back
got correct res for req: fpmk8yC3iQcgHAJBKqe2zh