如何在没有警告的情况下停止 asyncio 中的协程 "Task was destroyed but it is pending!"

how to stop coroutines in asyncio without warning "Task was destroyed but it is pending!"

我有一个要实现异步任务的同步循环。 到目前为止,我写了这个 POC:

async def consumer(queue):
    while True:
        revision = await queue.get()
        await asyncio.sleep(revision/10 * random.random() * 2)
        queue.task_done()
        print(f'done working on {revision}')

def main():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    task = loop.create_task(consumer(queue))
    for run in range(1, 10):
        print(f'produced {run}')
        # the idea is to start the task while still working on syncronius loop
        loop.run_until_complete(queue.put(run))
        time.sleep(.1)
    print('---- done producing')
    # loop.run_until_complete(task)  # runs forever
    # loop.run_until_complete(asyncio.gather(task))  # runs forever
    loop.run_until_complete(queue.join())  # gives: Task was destroyed but it is pending!

main()

代码执行良好,我得到了我想要的输出:

produced 1
produced 2
done working on 1
produced 3
produced 4
done working on 2
produced 5
produced 6
produced 7
done working on 3
produced 8
produced 9
---- done producing
done working on 4
done working on 5
done working on 6
done working on 7
done working on 8
done working on 9
Task was destroyed but it is pending!
task: <Task pending name='Task-1' coro=<consumer() done, defined at .../deleteme.py:8> wait_for=<Future cancelled>>

除了最后一个警告(调试错误?Python3.9.1)但不是异常。我知道我做错了什么,但我不知道到底是什么。在多进程中写这个会解决它,但这个问题几乎整个下午都在困扰我。

您的 task 运行 consumer,它使用 while True - 所以它仍在工作 - 这会产生问题。

这就是为什么 run_until_complete(task) 永远运行,因为你永远运行 while True

你必须停止这个 task。您可以使用 queue 发送一些值 - 即 stop - 您可以签入 revision != 'stop' 并使用 return 退出 while True

while True:
    revision = await queue.get()

    if revision == 'stop':
        queue.task_done()
        print(f'done working on {revision}')
        return  # <-- exit `task`

    await asyncio.sleep(revision/10 * random.random() * 2)
    
    queue.task_done()
    print(f'done working on {revision}')

并且您必须将 'stop' 作为队列中的最后一个值发送

loop.run_until_complete(queue.put('stop'))

然后所有版本都可以工作

    loop.run_until_complete(task)                  # now works
    loop.run_until_complete(asyncio.gather(task))  # now works
    loop.run_until_complete(queue.join())          # now works

完整的工作代码:

import asyncio
import time
import random

async def consumer(queue):
    while True:
        revision = await queue.get()

        if revision == 'stop':
            queue.task_done()
            print(f'done working on {revision}')
            return

        await asyncio.sleep(revision/10 * random.random() * 2)
        
        queue.task_done()
        print(f'done working on {revision}')


def main():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    task = loop.create_task(consumer(queue))
    for run in range(1, 10):
        print(f'produced {run}')
        # the idea is to start the task while still working on syncronius loop
        loop.run_until_complete(queue.put(run))
        time.sleep(.5)
    print('---- done producing')
    
    print(f'produced stop')
    loop.run_until_complete(queue.put('stop'))
    
    loop.run_until_complete(task)  # now works
    #loop.run_until_complete(asyncio.gather(task))  # now works
    #loop.run_until_complete(queue.join())  # now works

main()