aiohttp:按域类型限制每秒请求数

aiohttp: rate limiting requests-per-second by domain type

我已经看过了here。但我仍然无法理解它。 以下是我目前的实现方式:

urls_without_rate_limit = 
    [
       'http://httpbin.org/get'
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get'
    ]

urls_with_rate_limit = 
    [
       'http://eu.httpbin.org/get'
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get'
    ]

api_rate = 2
api_limit = 6

loop = asyncio.get_event_loop()
    loop.run_until_complete(
        process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))

    loop.run_until_complete(
        process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
    limit = asyncio.Semaphore(limit)

    f = Fetch(
        rate=rate,
        limit=limit
    )

    tasks = []
    for url in urls:
        tasks.append(f.make_request(url=url))

    results = await asyncio.gather(*tasks)

如您所见,它将完成第一轮 process 然后开始第二轮速率限制。

它工作正常,但有没有一种方法可以让我在不同的速率限制下同时开始两轮?

tvm

我会详细说明我评论的内容。所以你可以尝试自己的解决方案(尽管我会在这里给出完整的代码)。

你可以有一个定义一些规则的字典(api -> 每秒速率限制):

APIS_RATE_LIMIT_PER_S = {
  "http://api.mathjs.org/v4?precision=5": 1,
  "http://api.mathjs.org/v4?precision=2": 3,
}

然后您可以根据请求使用它来决定选择哪个信号量 URL(实际上,您必须进行一些解析才能获得要控制的端点)。一旦你有了它,只需使用信号量来确保你限制同时执行你的请求的进程的数量。拼图的最后一块显然是在释放信号量之前添加延迟。

我将获得所建议内容的不同版本 here,但它基本上是相同的解决方案。我刚刚做到这一点,因此您可以修改会话对象,以便每次调用 session.get 都会自动应用速率限制控制。

def set_rate_limits(session, apis_rate_limits_per_s):
    semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}

    @asynccontextmanager
    async def limit_rate(url):
        await semaphores[url].acquire() 
        start = time.time()
        try:
            yield semaphores[url]
        finally:
            duration = time.time() - start
            await asyncio.sleep(1 - duration)
            semaphores[url].release()

    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                return await coroutine(url, *args, **kwargs)

        return coroutine_with_rate_limit

    session.get = add_limit_rate(session.get)
    session.post = add_limit_rate(session.post)
    return session

请注意,使用 add_limit_rate 您可以将速率限制控制添加到任何具有 API 端点作为第一个参数的协程。但这里我们只修改 session.getsession.post.

最后你可以像这样使用 set_rate_limits 函数:

async def main():
    apis = APIS_RATE_LIMIT_PER_S.keys()
    params = [
        {"expr" : "2^2"},
        {"expr" : "1/0.999"},
        {"expr" : "1/1.001"},
        {"expr" : "1*1.001"},
    ]
    async with aiohttp.ClientSession() as session:
        session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
        api_requests = [get_text_result(session, url, params=p) for url, p  in product(apis, params)]
        text_responses = await asyncio.gather(*api_requests)
        print(text_responses)


async def get_text_result(session, url, params=None):
    result = await session.get(url, params=params)
    return await result.text()

如果你 运行 这段代码你不会看到很多正在发生的事情,你可以在 set_rate_limits 到 "make sure" 的地方添加一些 print 到 "make sure" 速率限制正确执行:

import time

# [...] change this part : 
    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                ######### debug 
                global request_count
                request_count += 1
                this_req_id = request_count
                rate_lim = APIS_RATE_LIMIT_PER_S[url]
                print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
                ########
                r = await coroutine(url, *args, **kwargs)

            ######### debug 
            print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
            ######### 
            return r

如果你运行这个例子asyncio.run(main()),你应该得到类似的东西:

request #1 ->        1ms     rate 1/s
request #2 ->        2ms     rate 3/s
request #3 ->        3ms     rate 3/s
request #4 ->        3ms     rate 3/s
request #1 <-     1003ms     rate 1/s
request #2 <-     1004ms     rate 3/s
request #3 <-     1004ms     rate 3/s
request #5 ->     1004ms     rate 1/s
request #6 ->     1005ms     rate 3/s
request #4 <-     1006ms     rate 3/s
request #5 <-     2007ms     rate 1/s
request #6 <-     2007ms     rate 3/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

这里似乎遵守速率限制,特别是我们可以看一下速率限制为每秒 1 个请求的 API:

request #1 ->        1ms     rate 1/s
request #1 <-     1003ms     rate 1/s
request #5 ->     1004ms     rate 1/s
request #5 <-     2007ms     rate 1/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

另一方面,这个解决方案不是很令人满意,因为我们人为地向所有请求添加了 1s ping。这是因为这部分代码:

await asyncio.sleep(1 - duration)
semaphores[url].release()

这里的问题是我们正在等待睡眠结束,然后再将控制权交还给事件循环(安排另一个任务,另一个请求)。使用这段代码可以很容易地解决这个问题:

asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))    

release_after_delay 就是:

async def release_after_delay(semaphore, delay):
    await asyncio.sleep(delay)
    semaphore.release()

asyncio.create_task function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future)。

使用这个补丁,我们有以下 API 速率限制设置为每秒一个请求:

request #1 ->        1ms     rate 1/s
request #1 <-      214ms     rate 1/s
request #2 ->     1002ms     rate 1/s
request #2 <-     1039ms     rate 1/s
request #3 ->     2004ms     rate 1/s
request #3 <-     2050ms     rate 1/s
request #4 ->     3009ms     rate 1/s
request #4 <-     3048ms     rate 1/s

它绝对更接近我们期望此代码执行的操作。我们会尽快从 API 获得每个响应(在此示例中,ping 为 200ms/37ms/46ms/41ms)。并且也遵守速率限制。

这可能不是最漂亮的代码,但它可以作为您开始工作的起点。一旦你让它工作得很好,也许可以用它做一个干净的包,我想这是其他人可能喜欢使用的东西。