aiohttp:按域类型限制每秒请求数
aiohttp: rate limiting requests-per-second by domain type
我已经看过了here。但我仍然无法理解它。
以下是我目前的实现方式:
urls_without_rate_limit =
[
'http://httpbin.org/get'
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get'
]
urls_with_rate_limit =
[
'http://eu.httpbin.org/get'
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get'
]
api_rate = 2
api_limit = 6
loop = asyncio.get_event_loop()
loop.run_until_complete(
process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))
loop.run_until_complete(
process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
limit = asyncio.Semaphore(limit)
f = Fetch(
rate=rate,
limit=limit
)
tasks = []
for url in urls:
tasks.append(f.make_request(url=url))
results = await asyncio.gather(*tasks)
如您所见,它将完成第一轮 process
然后开始第二轮速率限制。
它工作正常,但有没有一种方法可以让我在不同的速率限制下同时开始两轮?
tvm
我会详细说明我评论的内容。所以你可以尝试自己的解决方案(尽管我会在这里给出完整的代码)。
你可以有一个定义一些规则的字典(api -> 每秒速率限制):
APIS_RATE_LIMIT_PER_S = {
"http://api.mathjs.org/v4?precision=5": 1,
"http://api.mathjs.org/v4?precision=2": 3,
}
然后您可以根据请求使用它来决定选择哪个信号量 URL(实际上,您必须进行一些解析才能获得要控制的端点)。一旦你有了它,只需使用信号量来确保你限制同时执行你的请求的进程的数量。拼图的最后一块显然是在释放信号量之前添加延迟。
我将获得所建议内容的不同版本 here,但它基本上是相同的解决方案。我刚刚做到这一点,因此您可以修改会话对象,以便每次调用 session.get
都会自动应用速率限制控制。
def set_rate_limits(session, apis_rate_limits_per_s):
semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}
@asynccontextmanager
async def limit_rate(url):
await semaphores[url].acquire()
start = time.time()
try:
yield semaphores[url]
finally:
duration = time.time() - start
await asyncio.sleep(1 - duration)
semaphores[url].release()
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
return await coroutine(url, *args, **kwargs)
return coroutine_with_rate_limit
session.get = add_limit_rate(session.get)
session.post = add_limit_rate(session.post)
return session
请注意,使用 add_limit_rate
您可以将速率限制控制添加到任何具有 API 端点作为第一个参数的协程。但这里我们只修改 session.get
和 session.post
.
最后你可以像这样使用 set_rate_limits
函数:
async def main():
apis = APIS_RATE_LIMIT_PER_S.keys()
params = [
{"expr" : "2^2"},
{"expr" : "1/0.999"},
{"expr" : "1/1.001"},
{"expr" : "1*1.001"},
]
async with aiohttp.ClientSession() as session:
session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
api_requests = [get_text_result(session, url, params=p) for url, p in product(apis, params)]
text_responses = await asyncio.gather(*api_requests)
print(text_responses)
async def get_text_result(session, url, params=None):
result = await session.get(url, params=params)
return await result.text()
如果你 运行 这段代码你不会看到很多正在发生的事情,你可以在 set_rate_limits
到 "make sure" 的地方添加一些 print
到 "make sure" 速率限制正确执行:
import time
# [...] change this part :
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
######### debug
global request_count
request_count += 1
this_req_id = request_count
rate_lim = APIS_RATE_LIMIT_PER_S[url]
print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
########
r = await coroutine(url, *args, **kwargs)
######### debug
print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
#########
return r
如果你运行这个例子asyncio.run(main())
,你应该得到类似的东西:
request #1 -> 1ms rate 1/s
request #2 -> 2ms rate 3/s
request #3 -> 3ms rate 3/s
request #4 -> 3ms rate 3/s
request #1 <- 1003ms rate 1/s
request #2 <- 1004ms rate 3/s
request #3 <- 1004ms rate 3/s
request #5 -> 1004ms rate 1/s
request #6 -> 1005ms rate 3/s
request #4 <- 1006ms rate 3/s
request #5 <- 2007ms rate 1/s
request #6 <- 2007ms rate 3/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
这里似乎遵守速率限制,特别是我们可以看一下速率限制为每秒 1 个请求的 API:
request #1 -> 1ms rate 1/s
request #1 <- 1003ms rate 1/s
request #5 -> 1004ms rate 1/s
request #5 <- 2007ms rate 1/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
另一方面,这个解决方案不是很令人满意,因为我们人为地向所有请求添加了 1s ping。这是因为这部分代码:
await asyncio.sleep(1 - duration)
semaphores[url].release()
这里的问题是我们正在等待睡眠结束,然后再将控制权交还给事件循环(安排另一个任务,另一个请求)。使用这段代码可以很容易地解决这个问题:
asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))
release_after_delay
就是:
async def release_after_delay(semaphore, delay):
await asyncio.sleep(delay)
semaphore.release()
asyncio.create_task
function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate
). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future
)。
使用这个补丁,我们有以下 API 速率限制设置为每秒一个请求:
request #1 -> 1ms rate 1/s
request #1 <- 214ms rate 1/s
request #2 -> 1002ms rate 1/s
request #2 <- 1039ms rate 1/s
request #3 -> 2004ms rate 1/s
request #3 <- 2050ms rate 1/s
request #4 -> 3009ms rate 1/s
request #4 <- 3048ms rate 1/s
它绝对更接近我们期望此代码执行的操作。我们会尽快从 API 获得每个响应(在此示例中,ping 为 200ms/37ms/46ms/41ms)。并且也遵守速率限制。
这可能不是最漂亮的代码,但它可以作为您开始工作的起点。一旦你让它工作得很好,也许可以用它做一个干净的包,我想这是其他人可能喜欢使用的东西。
我已经看过了here。但我仍然无法理解它。 以下是我目前的实现方式:
urls_without_rate_limit =
[
'http://httpbin.org/get'
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get'
]
urls_with_rate_limit =
[
'http://eu.httpbin.org/get'
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get'
]
api_rate = 2
api_limit = 6
loop = asyncio.get_event_loop()
loop.run_until_complete(
process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))
loop.run_until_complete(
process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
limit = asyncio.Semaphore(limit)
f = Fetch(
rate=rate,
limit=limit
)
tasks = []
for url in urls:
tasks.append(f.make_request(url=url))
results = await asyncio.gather(*tasks)
如您所见,它将完成第一轮 process
然后开始第二轮速率限制。
它工作正常,但有没有一种方法可以让我在不同的速率限制下同时开始两轮?
tvm
我会详细说明我评论的内容。所以你可以尝试自己的解决方案(尽管我会在这里给出完整的代码)。
你可以有一个定义一些规则的字典(api -> 每秒速率限制):
APIS_RATE_LIMIT_PER_S = {
"http://api.mathjs.org/v4?precision=5": 1,
"http://api.mathjs.org/v4?precision=2": 3,
}
然后您可以根据请求使用它来决定选择哪个信号量 URL(实际上,您必须进行一些解析才能获得要控制的端点)。一旦你有了它,只需使用信号量来确保你限制同时执行你的请求的进程的数量。拼图的最后一块显然是在释放信号量之前添加延迟。
我将获得所建议内容的不同版本 here,但它基本上是相同的解决方案。我刚刚做到这一点,因此您可以修改会话对象,以便每次调用 session.get
都会自动应用速率限制控制。
def set_rate_limits(session, apis_rate_limits_per_s):
semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}
@asynccontextmanager
async def limit_rate(url):
await semaphores[url].acquire()
start = time.time()
try:
yield semaphores[url]
finally:
duration = time.time() - start
await asyncio.sleep(1 - duration)
semaphores[url].release()
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
return await coroutine(url, *args, **kwargs)
return coroutine_with_rate_limit
session.get = add_limit_rate(session.get)
session.post = add_limit_rate(session.post)
return session
请注意,使用 add_limit_rate
您可以将速率限制控制添加到任何具有 API 端点作为第一个参数的协程。但这里我们只修改 session.get
和 session.post
.
最后你可以像这样使用 set_rate_limits
函数:
async def main():
apis = APIS_RATE_LIMIT_PER_S.keys()
params = [
{"expr" : "2^2"},
{"expr" : "1/0.999"},
{"expr" : "1/1.001"},
{"expr" : "1*1.001"},
]
async with aiohttp.ClientSession() as session:
session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
api_requests = [get_text_result(session, url, params=p) for url, p in product(apis, params)]
text_responses = await asyncio.gather(*api_requests)
print(text_responses)
async def get_text_result(session, url, params=None):
result = await session.get(url, params=params)
return await result.text()
如果你 运行 这段代码你不会看到很多正在发生的事情,你可以在 set_rate_limits
到 "make sure" 的地方添加一些 print
到 "make sure" 速率限制正确执行:
import time
# [...] change this part :
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
######### debug
global request_count
request_count += 1
this_req_id = request_count
rate_lim = APIS_RATE_LIMIT_PER_S[url]
print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
########
r = await coroutine(url, *args, **kwargs)
######### debug
print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
#########
return r
如果你运行这个例子asyncio.run(main())
,你应该得到类似的东西:
request #1 -> 1ms rate 1/s
request #2 -> 2ms rate 3/s
request #3 -> 3ms rate 3/s
request #4 -> 3ms rate 3/s
request #1 <- 1003ms rate 1/s
request #2 <- 1004ms rate 3/s
request #3 <- 1004ms rate 3/s
request #5 -> 1004ms rate 1/s
request #6 -> 1005ms rate 3/s
request #4 <- 1006ms rate 3/s
request #5 <- 2007ms rate 1/s
request #6 <- 2007ms rate 3/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
这里似乎遵守速率限制,特别是我们可以看一下速率限制为每秒 1 个请求的 API:
request #1 -> 1ms rate 1/s
request #1 <- 1003ms rate 1/s
request #5 -> 1004ms rate 1/s
request #5 <- 2007ms rate 1/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
另一方面,这个解决方案不是很令人满意,因为我们人为地向所有请求添加了 1s ping。这是因为这部分代码:
await asyncio.sleep(1 - duration)
semaphores[url].release()
这里的问题是我们正在等待睡眠结束,然后再将控制权交还给事件循环(安排另一个任务,另一个请求)。使用这段代码可以很容易地解决这个问题:
asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))
release_after_delay
就是:
async def release_after_delay(semaphore, delay):
await asyncio.sleep(delay)
semaphore.release()
asyncio.create_task
function makes the coroutine "run this in the background". Which means in this code that the semaphore will be released later, but that we don't need to wait for it to give control back to the even loop (which means some other request can be scheduled and also that we can get the result in add_limit_rate
). In other words, we don't care about the result of this coroutine, we just want it to run at some point in the future (which is probably why this function used to be call ensure_future
)。
使用这个补丁,我们有以下 API 速率限制设置为每秒一个请求:
request #1 -> 1ms rate 1/s
request #1 <- 214ms rate 1/s
request #2 -> 1002ms rate 1/s
request #2 <- 1039ms rate 1/s
request #3 -> 2004ms rate 1/s
request #3 <- 2050ms rate 1/s
request #4 -> 3009ms rate 1/s
request #4 <- 3048ms rate 1/s
它绝对更接近我们期望此代码执行的操作。我们会尽快从 API 获得每个响应(在此示例中,ping 为 200ms/37ms/46ms/41ms)。并且也遵守速率限制。
这可能不是最漂亮的代码,但它可以作为您开始工作的起点。一旦你让它工作得很好,也许可以用它做一个干净的包,我想这是其他人可能喜欢使用的东西。