如何使用 Python asyncio 限制并发?
How to limit concurrency with Python asyncio?
假设我们有一堆 link 需要下载,每个 link 可能需要不同的下载时间。而且我最多只能使用 3 个连接进行下载。现在,我想确保使用 asyncio 高效地执行此操作。
这是我要实现的目标:在任何时间点,尽量确保我至少有 3 次下载 运行。
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
数字代表下载 link 次,连字符代表等待下载。
这是我现在正在使用的代码
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
并且输出符合预期:
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
但这是我的问题:
目前,我只是等待 9 秒以保持主要功能 运行 直到下载完成。在退出 main
功能之前,是否有一种有效的方法来等待最后一次下载完成? (我知道有 asyncio.wait
,但我需要存储所有任务引用才能正常工作)
执行此类任务的好库是什么?我知道 javascript 有很多异步库,但是 Python 呢?
编辑:
2. 什么是处理常见异步模式的好库? (类似于 async)
在阅读此答案的其余部分之前,请注意使用 asyncio 限制并行任务数量的惯用方法是使用 asyncio.Semaphore
,如 and elegantly abstracted in 所示。这个答案包含工作,但实现相同的方法有点复杂。我留下答案是因为在某些情况下,这种方法比信号量更有优势,特别是当要完成的工作非常大或不受限制,并且您无法提前创建所有协程时。在那种情况下,第二个(基于队列的)解决方案就是这个答案就是你想要的。但在大多数常规情况下,例如通过 aiohttp 并行下载,您应该改用信号量。
您基本上需要一个固定大小的下载任务池。 asyncio
没有预制任务池,但创建任务池很容易:只需保留一组任务,不要让它超过限制。尽管问题表明您不愿意走那条路,但代码最终会更加优雅:
import asyncio, random
async def download(code):
wait_time = random.randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
另一种方法是创建固定数量的协程来执行下载,就像固定大小的线程池一样,并使用 asyncio.Queue
为它们提供工作。这消除了手动限制下载数量的需要,下载数量将自动受调用 download()
:
的协程数量限制
# download() defined as above
async def download_worker(q):
while True:
code = await q.get()
await download(code)
q.task_done()
async def main(loop):
q = asyncio.Queue()
workers = [loop.create_task(download_worker(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
await q.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
至于你的另一个问题,显而易见的选择是 aiohttp
。
如果我没记错的话,您正在搜索 asyncio.Semaphore。使用示例:
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks) # await moment all downloads done
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
输出:
downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6
可在 . Note that aiohttp
has a Semaphore equivalent built in, which you can see an example of 中找到使用 aiohttp
进行异步下载的示例。它的默认限制为 100 个连接。
asyncio-pool 库可以满足您的需求。
https://pypi.org/project/asyncio-pool/
LIST_OF_URLS = ("http://www.google.com", "......")
pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
小更新:不再需要创建循环。我调整了下面的代码。稍微清理一下。
# download(code) is the same
async def main():
no_concurrent = 3
dltasks = set()
for i in range(9):
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(asyncio.create_task(download(i)))
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
if __name__ == '__main__':
asyncio.run(main())
我使用了 Mikhails 的答案,最后得到了这个 gem
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
你会 运行 而不是正常收集
await gather_with_concurrency(100, *tasks)
使用信号量,你也可以创建一个装饰器来包装函数
import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
# Bind the default event loop
sem = asyncio.Semaphore(limit)
def executor(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with sem:
return await func(*args, **kwargs)
return wrapper
return executor
然后,将装饰器添加到源下载功能。
@request_concurrency_limit_decorator(limit=...)
async def download(...):
...
现在可以像以前一样调用下载函数了,但是用了Semaphore来限制并发。
await download(...)
需要注意的是,装饰器函数在执行时,创建的Semaphore绑定了默认的事件循环,所以不能调用asyncio.run
创建新的循环。相反,调用 asyncio.get_event_loop().run...
以使用默认事件循环。
如果您有一个生成器来生成您的任务,则可能有更多任务无法同时放入内存。
经典的 asyncio.Semaphore
上下文管理器模式将所有任务同时竞争到内存中。
我不喜欢 asyncio.Queue
模式。您 可以 阻止它将所有任务预加载到内存中(通过设置 maxsize=1
),但它仍然需要样板文件来定义、启动和关闭工作协同程序(消耗来自the que),并且你必须确保如果任务抛出异常,worker 不会失败。感觉不pythonic,就像在实现自己的 multiprocessing.pool
.
相反,这里有一个替代方案:
sem = asyncio.Semaphore(n := 5) # specify maximum concurrency
async def task_wrapper(args):
try:
await my_task(*args)
finally:
sem.release()
for args in my_generator: # may yield too many to list
await sem.acquire()
asyncio.create_task(task_wrapper(args))
# wait for all tasks to complete
for i in range(n):
await sem.acquire()
这会在有足够多的活动任务时暂停生成器,并让事件循环清理已完成的任务。请注意,对于较旧的 python 版本,请将 create_task
替换为 ensure_future
。
假设我们有一堆 link 需要下载,每个 link 可能需要不同的下载时间。而且我最多只能使用 3 个连接进行下载。现在,我想确保使用 asyncio 高效地执行此操作。
这是我要实现的目标:在任何时间点,尽量确保我至少有 3 次下载 运行。
Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
数字代表下载 link 次,连字符代表等待下载。
这是我现在正在使用的代码
from random import randint
import asyncio
count = 0
async def download(code, permit_download, no_concurrent, downloading_event):
global count
downloading_event.set()
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
count -= 1
if count < no_concurrent and not permit_download.is_set():
permit_download.set()
async def main(loop):
global count
permit_download = asyncio.Event()
permit_download.set()
downloading_event = asyncio.Event()
no_concurrent = 3
i = 0
while i < 9:
if permit_download.is_set():
count += 1
if count >= no_concurrent:
permit_download.clear()
loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
await downloading_event.wait() # To force context to switch to download function
downloading_event.clear()
i += 1
else:
await permit_download.wait()
await asyncio.sleep(9)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
并且输出符合预期:
downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
但这是我的问题:
目前,我只是等待 9 秒以保持主要功能 运行 直到下载完成。在退出
main
功能之前,是否有一种有效的方法来等待最后一次下载完成? (我知道有asyncio.wait
,但我需要存储所有任务引用才能正常工作)执行此类任务的好库是什么?我知道 javascript 有很多异步库,但是 Python 呢?
编辑: 2. 什么是处理常见异步模式的好库? (类似于 async)
在阅读此答案的其余部分之前,请注意使用 asyncio 限制并行任务数量的惯用方法是使用 asyncio.Semaphore
,如
您基本上需要一个固定大小的下载任务池。 asyncio
没有预制任务池,但创建任务池很容易:只需保留一组任务,不要让它超过限制。尽管问题表明您不愿意走那条路,但代码最终会更加优雅:
import asyncio, random
async def download(code):
wait_time = random.randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
async def main(loop):
no_concurrent = 3
dltasks = set()
i = 0
while i < 9:
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(
dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(loop.create_task(download(i)))
i += 1
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
另一种方法是创建固定数量的协程来执行下载,就像固定大小的线程池一样,并使用 asyncio.Queue
为它们提供工作。这消除了手动限制下载数量的需要,下载数量将自动受调用 download()
:
# download() defined as above
async def download_worker(q):
while True:
code = await q.get()
await download(code)
q.task_done()
async def main(loop):
q = asyncio.Queue()
workers = [loop.create_task(download_worker(q)) for _ in range(3)]
i = 0
while i < 9:
await q.put(i)
i += 1
await q.join() # wait for all tasks to be processed
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
至于你的另一个问题,显而易见的选择是 aiohttp
。
如果我没记错的话,您正在搜索 asyncio.Semaphore。使用示例:
import asyncio
from random import randint
async def download(code):
wait_time = randint(1, 3)
print('downloading {} will take {} second(s)'.format(code, wait_time))
await asyncio.sleep(wait_time) # I/O, context will switch to main function
print('downloaded {}'.format(code))
sem = asyncio.Semaphore(3)
async def safe_download(i):
async with sem: # semaphore limits num of simultaneous downloads
return await download(i)
async def main():
tasks = [
asyncio.ensure_future(safe_download(i)) # creating task starts coroutine
for i
in range(9)
]
await asyncio.gather(*tasks) # await moment all downloads done
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
输出:
downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6
可在 aiohttp
has a Semaphore equivalent built in, which you can see an example of aiohttp
进行异步下载的示例。它的默认限制为 100 个连接。
asyncio-pool 库可以满足您的需求。
https://pypi.org/project/asyncio-pool/
LIST_OF_URLS = ("http://www.google.com", "......")
pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
小更新:不再需要创建循环。我调整了下面的代码。稍微清理一下。
# download(code) is the same
async def main():
no_concurrent = 3
dltasks = set()
for i in range(9):
if len(dltasks) >= no_concurrent:
# Wait for some download to finish before adding a new one
_done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
dltasks.add(asyncio.create_task(download(i)))
# Wait for the remaining downloads to finish
await asyncio.wait(dltasks)
if __name__ == '__main__':
asyncio.run(main())
我使用了 Mikhails 的答案,最后得到了这个 gem
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
你会 运行 而不是正常收集
await gather_with_concurrency(100, *tasks)
使用信号量,你也可以创建一个装饰器来包装函数
import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
# Bind the default event loop
sem = asyncio.Semaphore(limit)
def executor(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with sem:
return await func(*args, **kwargs)
return wrapper
return executor
然后,将装饰器添加到源下载功能。
@request_concurrency_limit_decorator(limit=...)
async def download(...):
...
现在可以像以前一样调用下载函数了,但是用了Semaphore来限制并发。
await download(...)
需要注意的是,装饰器函数在执行时,创建的Semaphore绑定了默认的事件循环,所以不能调用asyncio.run
创建新的循环。相反,调用 asyncio.get_event_loop().run...
以使用默认事件循环。
如果您有一个生成器来生成您的任务,则可能有更多任务无法同时放入内存。
经典的 asyncio.Semaphore
上下文管理器模式将所有任务同时竞争到内存中。
我不喜欢 asyncio.Queue
模式。您 可以 阻止它将所有任务预加载到内存中(通过设置 maxsize=1
),但它仍然需要样板文件来定义、启动和关闭工作协同程序(消耗来自the que),并且你必须确保如果任务抛出异常,worker 不会失败。感觉不pythonic,就像在实现自己的 multiprocessing.pool
.
相反,这里有一个替代方案:
sem = asyncio.Semaphore(n := 5) # specify maximum concurrency
async def task_wrapper(args):
try:
await my_task(*args)
finally:
sem.release()
for args in my_generator: # may yield too many to list
await sem.acquire()
asyncio.create_task(task_wrapper(args))
# wait for all tasks to complete
for i in range(n):
await sem.acquire()
这会在有足够多的活动任务时暂停生成器,并让事件循环清理已完成的任务。请注意,对于较旧的 python 版本,请将 create_task
替换为 ensure_future
。