取消执行器中的异步任务 运行
Cancelling asyncio task run in executor
我正在抓取一些网站,使用 asyncio 并行化 requests
库:
def run():
asyncio.run(scrape());
def check_link(link):
#.... code code code ...
response = requests.get(link)
#.... code code code ...
write_some_stats_into_db()
async def scrape():
#.... code code code ...
task = asyncio.get_event_loop().run_in_executor(check_link(link));
#.... code code code ...
if done:
for task in all_tasks:
task.cancel();
我只需要找到一个'correct' link,然后我就可以停止程序了。但是,因为 check_link
在执行器中是 运行,它的线程会自动守护进程,因此即使在调用 taks.cancel()
之后,我仍然必须等待所有其他 运行ning check_link
完成。
你有什么想法如何 'force-kill' 另一个 运行ning 检查线程执行器吗?
你可以按照下面的方式来做,实际上从我的角度来看,如果你不必使用 asyncio
来完成任务,只使用没有任何异步循环的线程,因为它使你的代码更复杂。
import asyncio
from random import randint
import time
from functools import partial
# imagine that this is links array
LINKS = list(range(1000))
# how many thread-worker you want to have simultaneously
WORKERS_NUM = 10
# stops the app
STOP_EVENT = asyncio.Event()
STOP_EVENT.clear()
def check_link(link: str) -> int:
"""checks link in another thread and returns result"""
time.sleep(3)
r = randint(1, 11)
print(f"{link}____{r}\n")
return r
async def check_link_wrapper(q: asyncio.Queue):
"""Async wrapper around sync function"""
loop = asyncio.get_event_loop()
while not STOP_EVENT.is_set():
link = await q.get()
if not link:
break
value = await loop.run_in_executor(None, func=partial(check_link, link))
if value == 10:
STOP_EVENT.set()
print("Hurray! We got TEN !")
async def feeder(q: asyncio.Queue):
"""Send tasks and "poison pill" to all workers"""
# send tasks to workers
for link in LINKS:
await q.put(link)
# ask workers to stop
for _ in range(WORKERS_NUM):
await q.put(None)
async def amain():
"""Main async function of the app"""
# maxsize is one since we want the app
# to stop as fast as possible if stop condition is met
q = asyncio.Queue(maxsize=1)
# we create separate task, since we do not want to await feeder
# we are interested only in workers
asyncio.create_task(feeder(q))
await asyncio.gather(
*[check_link_wrapper(q) for _ in range(WORKERS_NUM)],
)
if __name__ == '__main__':
asyncio.run(amain())
我正在抓取一些网站,使用 asyncio 并行化 requests
库:
def run():
asyncio.run(scrape());
def check_link(link):
#.... code code code ...
response = requests.get(link)
#.... code code code ...
write_some_stats_into_db()
async def scrape():
#.... code code code ...
task = asyncio.get_event_loop().run_in_executor(check_link(link));
#.... code code code ...
if done:
for task in all_tasks:
task.cancel();
我只需要找到一个'correct' link,然后我就可以停止程序了。但是,因为 check_link
在执行器中是 运行,它的线程会自动守护进程,因此即使在调用 taks.cancel()
之后,我仍然必须等待所有其他 运行ning check_link
完成。
你有什么想法如何 'force-kill' 另一个 运行ning 检查线程执行器吗?
你可以按照下面的方式来做,实际上从我的角度来看,如果你不必使用 asyncio
来完成任务,只使用没有任何异步循环的线程,因为它使你的代码更复杂。
import asyncio
from random import randint
import time
from functools import partial
# imagine that this is links array
LINKS = list(range(1000))
# how many thread-worker you want to have simultaneously
WORKERS_NUM = 10
# stops the app
STOP_EVENT = asyncio.Event()
STOP_EVENT.clear()
def check_link(link: str) -> int:
"""checks link in another thread and returns result"""
time.sleep(3)
r = randint(1, 11)
print(f"{link}____{r}\n")
return r
async def check_link_wrapper(q: asyncio.Queue):
"""Async wrapper around sync function"""
loop = asyncio.get_event_loop()
while not STOP_EVENT.is_set():
link = await q.get()
if not link:
break
value = await loop.run_in_executor(None, func=partial(check_link, link))
if value == 10:
STOP_EVENT.set()
print("Hurray! We got TEN !")
async def feeder(q: asyncio.Queue):
"""Send tasks and "poison pill" to all workers"""
# send tasks to workers
for link in LINKS:
await q.put(link)
# ask workers to stop
for _ in range(WORKERS_NUM):
await q.put(None)
async def amain():
"""Main async function of the app"""
# maxsize is one since we want the app
# to stop as fast as possible if stop condition is met
q = asyncio.Queue(maxsize=1)
# we create separate task, since we do not want to await feeder
# we are interested only in workers
asyncio.create_task(feeder(q))
await asyncio.gather(
*[check_link_wrapper(q) for _ in range(WORKERS_NUM)],
)
if __name__ == '__main__':
asyncio.run(amain())