如何将 asyncio 与 ProcessPoolExecutor 一起使用
How to use asyncio with ProcessPoolExecutor
我正在网络上搜索大量地址,我想在我的任务中同时使用 asyncio 和 ProcessPoolExecutor 来快速搜索地址。
async def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
futures_list = []
for _address in _addresses:
futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]
for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
results = await f
asyncio.get_event_loop().run_until_complete(main())
预计:
我想 execute_parallel
函数应该 运行 并行。
错误:
Traceback (most recent call last):
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
results = await f
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
return f.result() # May raise f.exception().
TypeError: can't pickle coroutine objects
我不确定我是否回答了正确的问题,但您的代码的目的似乎是 运行 您的 execute_parallel 使用 Asyncio 跨多个进程运行。与使用 ProcessPoolExecutor 相反,为什么不尝试使用普通的多处理池并在每个 运行 中设置单独的 Asyncio 循环。您可以为每个核心设置一个进程,让 Asyncio 在每个进程中发挥其魔力。
async def run_loop(addresses):
loop = asyncio.get_event_loop()
loops = [loop.create_task(execute_parallel, address) for address in addresses]
loop.run_until_complete(asyncio.wait(loops))
def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with multiprocessing.Pool(processes=n_jobs) as pool:
pool.imap_unordered(run_loop, _addresses)
我使用 Pool.imap_unordered 取得了巨大成功,但根据您的需要,您可能更喜欢 Pool.map 或其他一些功能。您可以使用块大小或每个列表中的地址数量来获得最佳结果(即,如果您遇到很多超时,您可能希望减少同时处理的地址数量)
我正在网络上搜索大量地址,我想在我的任务中同时使用 asyncio 和 ProcessPoolExecutor 来快速搜索地址。
async def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
futures_list = []
for _address in _addresses:
futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]
for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
results = await f
asyncio.get_event_loop().run_until_complete(main())
预计:
我想 execute_parallel
函数应该 运行 并行。
错误:
Traceback (most recent call last):
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
results = await f
File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
return f.result() # May raise f.exception().
TypeError: can't pickle coroutine objects
我不确定我是否回答了正确的问题,但您的代码的目的似乎是 运行 您的 execute_parallel 使用 Asyncio 跨多个进程运行。与使用 ProcessPoolExecutor 相反,为什么不尝试使用普通的多处理池并在每个 运行 中设置单独的 Asyncio 循环。您可以为每个核心设置一个进程,让 Asyncio 在每个进程中发挥其魔力。
async def run_loop(addresses):
loop = asyncio.get_event_loop()
loops = [loop.create_task(execute_parallel, address) for address in addresses]
loop.run_until_complete(asyncio.wait(loops))
def main():
n_jobs = 3
addresses = [list of addresses]
_addresses = list_splitter(data=addresses, n=n_jobs)
with multiprocessing.Pool(processes=n_jobs) as pool:
pool.imap_unordered(run_loop, _addresses)
我使用 Pool.imap_unordered 取得了巨大成功,但根据您的需要,您可能更喜欢 Pool.map 或其他一些功能。您可以使用块大小或每个列表中的地址数量来获得最佳结果(即,如果您遇到很多超时,您可能希望减少同时处理的地址数量)