使用 Python 的 asyncio.Semaphore 控制 HTTP 请求的并发
Controlling the Concurrency of HTTP Requests Using Python's asyncio.Semaphore
我正在尝试找出一种方法来限制使用 Python 的 asyncio 和 [=35= 向服务器发出的并发 HTTP 请求的数量]httpx 模块。 我遇到了这个 Whosebug 。
它建议 asyncio.Semaphore
阻止多个消费者发出太多请求。虽然这个答案完美无缺,但它使用显式循环构造,而不是 asyncio.run
。当我用 asyncio.run
替换显式循环构造时,代码的行为发生了变化。现在它只执行三个请求,而不是执行所有 9 个请求,然后停止。
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks, return_exceptions=True) # await moment all downloads done
if __name__ == '__main__':
asyncio.run(main())
打印出来:
downloading 0 will take 3 second(s)
downloading 1 will take 1 second(s)
downloading 2 will take 3 second(s)
downloaded 1
downloaded 0
downloaded 2
我不得不将 await asyncio.gather(*tasks)
更改为 await asyncio.gather(*tasks, return_exceptions=True)
,这样代码就不会抛出 RuntimeError
。否则它会抛出这个错误,我已经打开了 asyncio 调试模式。
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
Traceback (most recent call last):
File "/home/rednafi/workspace/personal/demo/demo.py", line 66, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/rednafi/workspace/personal/demo/demo.py", line 62, in main
await asyncio.gather(*tasks) # await moment all downloads done
File "/home/rednafi/workspace/personal/demo/demo.py", line 52, in safe_download
async with sem: # semaphore limits num of simultaneous downloads
File "/usr/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
await self.acquire()
File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='Task-5' coro=<safe_download() running at /home/rednafi/workspace/personal/demo/demo.py:52> cb=[gather.<locals>._done_callback() at /usr/lib/python3.9/asyncio/tasks.py:764] created at /home/rednafi/workspace/personal/demo/demo.py:58> got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
但是,唯一的其他更改是将显式循环替换为 asyncio.run
。
问题是为什么代码的行为发生了变化?我怎样才能恢复旧的预期行为?
问题是在顶层创建的 Semaphore
缓存了在其创建期间处于活动状态的事件循环(由 asyncio 自动创建并在启动时由 get_event_loop()
返回的事件循环)。另一方面,asyncio.run()
在每个 运行 上创建一个新的事件循环。结果,您试图等待来自不同事件循环的信号量,但失败了。一如既往,隐藏异常而不了解其原因只会导致进一步的问题。
要正确解决此问题,您应该在 asyncio.run()
中创建信号量。例如,最简单的修复可能如下所示:
# ...
sem = None
async def main():
global sem
sem = asyncio.Semaphore(3)
# ...
一种更优雅的方法是从顶层完全删除 sem
并将其显式传递给 safe_download
:
async def safe_download(i, limit):
async with limit:
return await download(i)
async def main():
# limit parallel downloads to 3 at most
limit = asyncio.Semaphore(3)
# you don't need to explicitly call create_task() if you call
# `gather()` because `gather()` will do it for you
await asyncio.gather(*[safe_download(i, limit) for i in range(9)])
我正在尝试找出一种方法来限制使用 Python 的 asyncio 和 [=35= 向服务器发出的并发 HTTP 请求的数量]httpx 模块。 我遇到了这个 Whosebug
它建议 asyncio.Semaphore
阻止多个消费者发出太多请求。虽然这个答案完美无缺,但它使用显式循环构造,而不是 asyncio.run
。当我用 asyncio.run
替换显式循环构造时,代码的行为发生了变化。现在它只执行三个请求,而不是执行所有 9 个请求,然后停止。
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks, return_exceptions=True) # await moment all downloads done
if __name__ == '__main__':
asyncio.run(main())
打印出来:
downloading 0 will take 3 second(s)
downloading 1 will take 1 second(s)
downloading 2 will take 3 second(s)
downloaded 1
downloaded 0
downloaded 2
我不得不将 await asyncio.gather(*tasks)
更改为 await asyncio.gather(*tasks, return_exceptions=True)
,这样代码就不会抛出 RuntimeError
。否则它会抛出这个错误,我已经打开了 asyncio 调试模式。
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
Traceback (most recent call last):
File "/home/rednafi/workspace/personal/demo/demo.py", line 66, in <module>
asyncio.run(main())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/rednafi/workspace/personal/demo/demo.py", line 62, in main
await asyncio.gather(*tasks) # await moment all downloads done
File "/home/rednafi/workspace/personal/demo/demo.py", line 52, in safe_download
async with sem: # semaphore limits num of simultaneous downloads
File "/usr/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
await self.acquire()
File "/usr/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='Task-5' coro=<safe_download() running at /home/rednafi/workspace/personal/demo/demo.py:52> cb=[gather.<locals>._done_callback() at /usr/lib/python3.9/asyncio/tasks.py:764] created at /home/rednafi/workspace/personal/demo/demo.py:58> got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
但是,唯一的其他更改是将显式循环替换为 asyncio.run
。
问题是为什么代码的行为发生了变化?我怎样才能恢复旧的预期行为?
问题是在顶层创建的 Semaphore
缓存了在其创建期间处于活动状态的事件循环(由 asyncio 自动创建并在启动时由 get_event_loop()
返回的事件循环)。另一方面,asyncio.run()
在每个 运行 上创建一个新的事件循环。结果,您试图等待来自不同事件循环的信号量,但失败了。一如既往,隐藏异常而不了解其原因只会导致进一步的问题。
要正确解决此问题,您应该在 asyncio.run()
中创建信号量。例如,最简单的修复可能如下所示:
# ...
sem = None
async def main():
global sem
sem = asyncio.Semaphore(3)
# ...
一种更优雅的方法是从顶层完全删除 sem
并将其显式传递给 safe_download
:
async def safe_download(i, limit):
async with limit:
return await download(i)
async def main():
# limit parallel downloads to 3 at most
limit = asyncio.Semaphore(3)
# you don't need to explicitly call create_task() if you call
# `gather()` because `gather()` will do it for you
await asyncio.gather(*[safe_download(i, limit) for i in range(9)])