python3.6 async/await 仍然与 fastAPI 同步工作
python3.6 async/await still works synchronously with fastAPI
我有一个 fastAPI 应用程序,post 有两个请求,其中一个较长(如果有帮助,它们是 Elasticsearch 查询,我正在使用 AsyncElasticsearch 模块,该模块已经 returns协程)。这是我的尝试:
class my_module:
search_object = AsyncElasticsearch(url, port)
async def do_things(self):
resp1 = await search_object.search() #the longer one
print(check_resp1)
resp2 = await search_object.search() #the shorter one
print(check_resp2)
process(resp2)
process(resp1)
do_synchronous_things()
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
我观察到的不是等待 resp1
,而是当它到达 check_resp1
时它已经是一个完整的响应,就好像我根本没有使用异步一样。
我是 python 异步的新手,我知道我的代码无法运行,但我不知道如何修复它。据我所知,当解释器看到 await
时,它会启动函数然后继续前进,在这种情况下应该立即 post 下一个请求。我如何让它做到这一点?
是的,没错,在结果准备好之前协程不会继续。您可以同时使用 asyncio.gather 到 运行 个任务:
import asyncio
async def task(msg):
print(f"START {msg}")
await asyncio.sleep(1)
print(f"END {msg}")
return msg
async def main():
await task("1")
await task("2")
results = await asyncio.gather(task("3"), task("4"))
print(results)
if __name__ == "__main__":
asyncio.run(main())
测试:
$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']
或者您可以使用 asyncio.as_completed 获得最早的下一个结果:
for coro in asyncio.as_completed((task("5"), task("6"))):
earliest_result = await coro
print(earliest_result)
4 月 2 日星期五更新 09:25:33 UTC 2021:
asyncio.run 从 Python 3.7+ 开始可用,在以前的版本中,您必须手动创建和启动循环:
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
说明
你的代码运行同步的原因是在do_things
函数中,代码执行如下:
- 安排
search_object.search()
执行
- 等
search_object.search()
完成后得到结果
- 安排
search_object.search()
执行
- 等
search_object.search()
完成后得到结果
- 执行(同步)
process(resp2)
- 执行(同步)
process(resp1)
- 执行(同步)
do_synchronous_things()
您的意图是让第 1 步和第 3 步在第 2 步和第 4 步之前执行。您可以使用 unsync
库轻松实现 - 这里是 the documentation.
如何解决这个问题
from unsync import unsync
class my_module:
search_object = AsyncElasticsearch(url, port)
@unsync
async def search1():
return await search_object.search()
@unsync
async def search2(): # not sure if this is any different to search1
return await search_object.search()
async def do_things(self):
task1, task2 = self.search1(), self.search2() # schedule tasks
resp1, resp2 = task1.result(), task2.result() # wait till tasks are executed
# you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
process(resp2)
process(resp1)
do_synchronous_things() # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
更多信息
如果你想了解更多asyncio和异步编程,我推荐这个tutorial。还有一个类似的案例,您提出了一些可能的解决方案来同时制作协程 运行。
PS。显然我不能运行这段代码,所以你必须自己调试它。
我有一个 fastAPI 应用程序,post 有两个请求,其中一个较长(如果有帮助,它们是 Elasticsearch 查询,我正在使用 AsyncElasticsearch 模块,该模块已经 returns协程)。这是我的尝试:
class my_module:
search_object = AsyncElasticsearch(url, port)
async def do_things(self):
resp1 = await search_object.search() #the longer one
print(check_resp1)
resp2 = await search_object.search() #the shorter one
print(check_resp2)
process(resp2)
process(resp1)
do_synchronous_things()
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
我观察到的不是等待 resp1
,而是当它到达 check_resp1
时它已经是一个完整的响应,就好像我根本没有使用异步一样。
我是 python 异步的新手,我知道我的代码无法运行,但我不知道如何修复它。据我所知,当解释器看到 await
时,它会启动函数然后继续前进,在这种情况下应该立即 post 下一个请求。我如何让它做到这一点?
是的,没错,在结果准备好之前协程不会继续。您可以同时使用 asyncio.gather 到 运行 个任务:
import asyncio
async def task(msg):
print(f"START {msg}")
await asyncio.sleep(1)
print(f"END {msg}")
return msg
async def main():
await task("1")
await task("2")
results = await asyncio.gather(task("3"), task("4"))
print(results)
if __name__ == "__main__":
asyncio.run(main())
测试:
$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']
或者您可以使用 asyncio.as_completed 获得最早的下一个结果:
for coro in asyncio.as_completed((task("5"), task("6"))):
earliest_result = await coro
print(earliest_result)
4 月 2 日星期五更新 09:25:33 UTC 2021:
asyncio.run 从 Python 3.7+ 开始可用,在以前的版本中,您必须手动创建和启动循环:
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
说明
你的代码运行同步的原因是在do_things
函数中,代码执行如下:
- 安排
search_object.search()
执行 - 等
search_object.search()
完成后得到结果 - 安排
search_object.search()
执行 - 等
search_object.search()
完成后得到结果 - 执行(同步)
process(resp2)
- 执行(同步)
process(resp1)
- 执行(同步)
do_synchronous_things()
您的意图是让第 1 步和第 3 步在第 2 步和第 4 步之前执行。您可以使用 unsync
库轻松实现 - 这里是 the documentation.
如何解决这个问题
from unsync import unsync
class my_module:
search_object = AsyncElasticsearch(url, port)
@unsync
async def search1():
return await search_object.search()
@unsync
async def search2(): # not sure if this is any different to search1
return await search_object.search()
async def do_things(self):
task1, task2 = self.search1(), self.search2() # schedule tasks
resp1, resp2 = task1.result(), task2.result() # wait till tasks are executed
# you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
process(resp2)
process(resp1)
do_synchronous_things() # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
return thing
app = FastAPI()
@app.post("/")
async def service(user_input):
result = await my_module.do_things()
return results
更多信息
如果你想了解更多asyncio和异步编程,我推荐这个tutorial。还有一个类似的案例,您提出了一些可能的解决方案来同时制作协程 运行。
PS。显然我不能运行这段代码,所以你必须自己调试它。