python3.6 async/await 仍然与 fastAPI 同步工作

python3.6 async/await still works synchronously with fastAPI

我有一个 fastAPI 应用程序,post 有两个请求,其中一个较长(如果有帮助,它们是 Elasticsearch 查询,我正在使用 AsyncElasticsearch 模块,该模块已经 returns协程)。这是我的尝试:

class my_module:
    search_object = AsyncElasticsearch(url, port)

    async def do_things(self):
        resp1 = await search_object.search() #the longer one
        print(check_resp1)
        resp2 = await search_object.search() #the shorter one
        print(check_resp2)
        process(resp2)
        process(resp1)
        do_synchronous_things()
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

我观察到的不是等待 resp1,而是当它到达 check_resp1 时它已经是一个完整的响应,就好像我根本没有使用异步一样。

我是 python 异步的新手,我知道我的代码无法运行,但我不知道如何修复它。据我所知,当解释器看到 await 时,它会启动函数然后继续前进,在这种情况下应该立即 post 下一个请求。我如何让它做到这一点?

是的,没错,在结果准备好之前协程不会继续。您可以同时使用 asyncio.gather 到 运行 个任务:

import asyncio


async def task(msg):
    print(f"START {msg}")
    await asyncio.sleep(1)
    print(f"END {msg}")

    return msg


async def main():
    await task("1")
    await task("2")

    results = await asyncio.gather(task("3"), task("4"))

    print(results)


if __name__ == "__main__":
    asyncio.run(main())

测试:

$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']

或者您可以使用 asyncio.as_completed 获得最早的下一个结果:

for coro in asyncio.as_completed((task("5"), task("6"))):
    earliest_result = await coro
    print(earliest_result)

4 月 2 日星期五更新 09:25:33 UTC 2021:

asyncio.runPython 3.7+ 开始可用,在以前的版本中,您必须手动创建和启动循环:

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

说明

你的代码运行同步的原因是在do_things函数中,代码执行如下:

  1. 安排search_object.search()执行
  2. search_object.search()完成后得到结果
  3. 安排search_object.search()执行
  4. search_object.search()完成后得到结果
  5. 执行(同步)process(resp2)
  6. 执行(同步)process(resp1)
  7. 执行(同步)do_synchronous_things()

您的意图是让第 1 步和第 3 步在第 2 步和第 4 步之前执行。您可以使用 unsync 库轻松实现 - 这里是 the documentation.

如何解决这个问题

from unsync import unsync

class my_module:
    search_object = AsyncElasticsearch(url, port)

    @unsync
    async def search1():
        return await search_object.search()

    @unsync
    async def search2():  # not sure if this is any different to search1
        return await search_object.search()

    async def do_things(self):
        task1, task2 = self.search1(), self.search2()  # schedule tasks
        resp1, resp2 = task1.result(), task2.result()  # wait till tasks are executed
        # you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
        process(resp2)
        process(resp1)
        do_synchronous_things()  # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

更多信息

如果你想了解更多asyncio和异步编程,我推荐这个tutorial。还有一个类似的案例,您提出了一些可能的解决方案来同时制作协程 运行。

PS。显然我不能运行这段代码,所以你必须自己调试它。