异步 http 调用花费的时间是应有的两倍

async http call taking twice as long as it should

我正在学习异步和信号量,我已经使用 fastapi 创建了一个端点来进行测试。 fastapi 端点只是一个简单的服务器端,它接受一个带有休眠时间的请求,并在返回响应之前休眠那么久。我 运行 通过 uvicorn 的 fastapi 用于测试目的,但有 5 个工人。这仅用于测试,我知道生产环境中我应该使用 gunicorn 和 nginx,但出于学习目的我只使用 uvicorn

uvicorn api_example:app --port 8008 --host cdsre.co.uk --workers 5

api_example.py

from time import sleep, time
import json
from fastapi import FastAPI, Response, Request


app = FastAPI()


@app.post("/sleeper")
async def sleeper(request: Request):
    request_data = await request.json()
    sleep_time = request_data['sleep_time']
    start = time()
    sleep(sleep_time)
    end = time()
    return Response(content=json.dumps({"slept for": end - start}))

我本地计算机上的客户端代码正在尝试利用异步和信号量并行调用 3 个 post 请求。我有 6 个请求,睡眠定时器为 5 秒。所以这里的期望是处理 6 个请求大约需要 10 秒。

async_example.py

import aiohttp
import asyncio
import time


async def get_http_response(session, url):
    async with semaphore:
        print("firing request...")
        start = time.time()
        async with session.post(url, json={"sleep_time": 5}) as resp:
            response = await resp.text()
            end = time.time()
            print(f"Client time: {end - start}, server time: {response}")
            return response


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for number in range(6):
            url = f'http://cdsre.co.uk:8008/sleeper'
            tasks.append(asyncio.ensure_future(get_http_response(session, url)))
        responses = await asyncio.gather(*tasks)
        for response in responses:
            pass
            # print(response)


semaphore = asyncio.Semaphore(3)
start_time = time.time()
asyncio.get_event_loop().run_until_complete(main())
print("--- %s seconds ---" % (time.time() - start_time))

然而,post 请求经常需要两倍的时间。在这种情况下,睡眠定时器为 5 秒,一些请求需要 10 秒。

firing request...
firing request...
firing request...
Client time: 5.137275695800781, server time: {"slept for": 5.005105018615723}
firing request...
Client time: 10.158655643463135, server time: {"slept for": 5.0042970180511475}
Client time: 10.158655643463135, server time: {"slept for": 5.001959800720215}
firing request...
firing request...
Client time: 5.055504560470581, server time: {"slept for": 5.005110025405884}
Client time: 5.056135654449463, server time: {"slept for": 5.005115509033203}
Client time: 5.107320070266724, server time: {"slept for": 5.005107402801514}
--- 15.271023750305176 seconds ---

有时它慢了 3 倍,这总是我睡眠时间的一个因素,这让我觉得有某种排队发生或某种竞争条件我错过了,但我认为整个目的信号量模式是为了避免这些竞争条件,这样我在任何时候对 3 个请求的限制总是会少于服务器端可用的工作(5 个工作人员),因此应该始终有一个工作可用的服务器端来处理它。

我也不会在信号量内部开始计时,所以我不会提前启动它,所以它应该只在发送请求时启动计时器。希望我只是遗漏了一些明显的东西。如果有人想尝试的话,我把终点 url 留了下来。如果能帮我解决这个问题,我将不胜感激。本质上,我需要能够编写一个异步客户端,它可以并行发送请求到一个限制,并在测量响应时间时保持一致。

一些花费3倍的例子

firing request...
firing request...
firing request...
Client time: 15.127191305160522, server time: {"slept for": 5.001192808151245}
firing request...
Client time: 15.127155303955078, server time: {"slept for": 5.005094766616821}
Client time: 15.127155303955078, server time: {"slept for": 5.005074977874756}
firing request...
firing request...
Client time: 5.053789854049683, server time: {"slept for": 5.005076169967651}
Client time: 5.100871801376343, server time: {"slept for": 5.005076885223389}
Client time: 10.107984781265259, server time: {"slept for": 5.005110502243042}
--- 25.236175775527954 seconds ---

继续评论:

You’re using time.sleep. This will block the thread. You want to use await asyncio.sleep(sleep_time). - @dirn (this is the answer)

However, having taken your comment and applied it in my code the results are a lot more consistent . So what is the difference between time.sleep in a worker and await asyncio.sleep.......I was thinking that each worker would get a separate request, so a sleep blocking that thread wouldn't affect the other - . @Chris Doyle

虽然我没有检查 uvunicorn 默认情况下是如何设置它的 workers 的,但使用异步编码的全部意义在于在 同一个线程 . 由于您的视图被定义为 async,这是特别正确的 - 这种编码范式不希望异步函数在等待任何调用时阻塞 - 线程将停止。

如果您必须调用一些需要时间的代码而不是 sleep,并且还没有准备好异步,那么您可以通过 运行 将委托函数放入一个单独的线。 Python asyncio 通过提供 loop.run_in_executor 调用使这变得简单:它会自动创建一个带有线程池的基于线程的执行器,运行 你的阻塞函数在一个单独的线程中,同时释放异步事件循环的当前线程进一步编排其他 tasks/workers.

否则,如前所述,如果您使用 sleep 进行测试,只需等待 asyncio.sleep。您可以使用 await loop.run_in_executor(None, time.sleep, 5) 检查它在阻塞函数下的表现。