如何在没有警告的情况下停止 asyncio 中的协程 "Task was destroyed but it is pending!"
how to stop coroutines in asyncio without warning "Task was destroyed but it is pending!"
我有一个要实现异步任务的同步循环。
到目前为止,我写了这个 POC:
async def consumer(queue):
while True:
revision = await queue.get()
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.1)
print('---- done producing')
# loop.run_until_complete(task) # runs forever
# loop.run_until_complete(asyncio.gather(task)) # runs forever
loop.run_until_complete(queue.join()) # gives: Task was destroyed but it is pending!
main()
代码执行良好,我得到了我想要的输出:
produced 1
produced 2
done working on 1
produced 3
produced 4
done working on 2
produced 5
produced 6
produced 7
done working on 3
produced 8
produced 9
---- done producing
done working on 4
done working on 5
done working on 6
done working on 7
done working on 8
done working on 9
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<consumer() done, defined at .../deleteme.py:8> wait_for=<Future cancelled>>
除了最后一个警告(调试错误?Python3.9.1)但不是异常。我知道我做错了什么,但我不知道到底是什么。在多进程中写这个会解决它,但这个问题几乎整个下午都在困扰我。
您的 task
运行 consumer
,它使用 while True
- 所以它仍在工作 - 这会产生问题。
这就是为什么 run_until_complete(task)
永远运行,因为你永远运行 while True
。
你必须停止这个 task
。您可以使用 queue
发送一些值 - 即 stop
- 您可以签入 revision != 'stop'
并使用 return
退出 while True
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return # <-- exit `task`
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
并且您必须将 'stop'
作为队列中的最后一个值发送
loop.run_until_complete(queue.put('stop'))
然后所有版本都可以工作
loop.run_until_complete(task) # now works
loop.run_until_complete(asyncio.gather(task)) # now works
loop.run_until_complete(queue.join()) # now works
完整的工作代码:
import asyncio
import time
import random
async def consumer(queue):
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.5)
print('---- done producing')
print(f'produced stop')
loop.run_until_complete(queue.put('stop'))
loop.run_until_complete(task) # now works
#loop.run_until_complete(asyncio.gather(task)) # now works
#loop.run_until_complete(queue.join()) # now works
main()
我有一个要实现异步任务的同步循环。 到目前为止,我写了这个 POC:
async def consumer(queue):
while True:
revision = await queue.get()
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.1)
print('---- done producing')
# loop.run_until_complete(task) # runs forever
# loop.run_until_complete(asyncio.gather(task)) # runs forever
loop.run_until_complete(queue.join()) # gives: Task was destroyed but it is pending!
main()
代码执行良好,我得到了我想要的输出:
produced 1
produced 2
done working on 1
produced 3
produced 4
done working on 2
produced 5
produced 6
produced 7
done working on 3
produced 8
produced 9
---- done producing
done working on 4
done working on 5
done working on 6
done working on 7
done working on 8
done working on 9
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<consumer() done, defined at .../deleteme.py:8> wait_for=<Future cancelled>>
除了最后一个警告(调试错误?Python3.9.1)但不是异常。我知道我做错了什么,但我不知道到底是什么。在多进程中写这个会解决它,但这个问题几乎整个下午都在困扰我。
您的 task
运行 consumer
,它使用 while True
- 所以它仍在工作 - 这会产生问题。
这就是为什么 run_until_complete(task)
永远运行,因为你永远运行 while True
。
你必须停止这个 task
。您可以使用 queue
发送一些值 - 即 stop
- 您可以签入 revision != 'stop'
并使用 return
退出 while True
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return # <-- exit `task`
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
并且您必须将 'stop'
作为队列中的最后一个值发送
loop.run_until_complete(queue.put('stop'))
然后所有版本都可以工作
loop.run_until_complete(task) # now works
loop.run_until_complete(asyncio.gather(task)) # now works
loop.run_until_complete(queue.join()) # now works
完整的工作代码:
import asyncio
import time
import random
async def consumer(queue):
while True:
revision = await queue.get()
if revision == 'stop':
queue.task_done()
print(f'done working on {revision}')
return
await asyncio.sleep(revision/10 * random.random() * 2)
queue.task_done()
print(f'done working on {revision}')
def main():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
task = loop.create_task(consumer(queue))
for run in range(1, 10):
print(f'produced {run}')
# the idea is to start the task while still working on syncronius loop
loop.run_until_complete(queue.put(run))
time.sleep(.5)
print('---- done producing')
print(f'produced stop')
loop.run_until_complete(queue.put('stop'))
loop.run_until_complete(task) # now works
#loop.run_until_complete(asyncio.gather(task)) # now works
#loop.run_until_complete(queue.join()) # now works
main()