有没有办法手动打开异步事件循环

Is there a way to manually switch on asyncio event loop

我想使用事件循环来监控任何插入数据到我的 asyncio.Queue(你可以在这里找到它的源代码 https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py),但是我 运行 遇到了一些问题。这是以下代码:

import asyncio
import threading

async def recv(q):
    while True:
        msg = await q.get()
        print(msg)

async def checking_task():
    while True:
        await asyncio.sleep(0.1)

def loop_in_thread(loop,q):
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(recv(q))
    asyncio.ensure_future(insert(q))
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended
    loop.run_forever()

async def insert(q):
    print('invoked')
    await q.put('hello')

q = asyncio.Queue() 
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop, q,))
t.start()

程序已经启动,可以看到如下结果

invoked
hello
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>}

但是现在如果我们使用q.put_nowait('test')手动将数据添加到q中,我们会得到以下结果:

q.put_nowait('test') # a non-async way to add data into queue
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
 wait_for=<Future finished result=None>>}

如你所见,future 已经完成了,但我们还没有打印出新添加的字符串'test'。换句话说,即使与 q.get() 相关的 Future 已完成并且没有其他任务 运行ning,msg = await q.get() 仍在等待。这让我感到困惑,因为在官方文档(https://docs.python.org/3/library/asyncio-task.html)中,它说

result = await future or result = yield from future – suspends the coroutine until the future is done, then returns the future’s result

似乎即使 Future 已经完成,我们仍然需要在其他异步函数中使用某种 await 来使事件循环继续处理任务。

我找到了解决这个问题的方法,就是添加一个 checking_task(),并将协程添加到事件循环中;然后它将按预期工作。

但是添加 checking_task() 协程对于 CPU 来说成本非常高,因为它只是 运行 一个 while 循环。我想知道是否有一些手动方法可以让我们在不使用异步函数的情况下触发 await 事件。例如,像

这样神奇的东西
q.put_nowait('test')
loop.ok_you_can_start_running_other_pending_tasks()

帮助将不胜感激!谢谢。

所以我最终使用了

loop.call_soon_threadsafe(q.put_nowait, 'test')

它会按预期工作。弄明白之后,我搜索了一些关于 .原来这个post()也有同样的问题。 @kfx 的答案也可以,即

loop.call_soon_threadsafe(loop.create_task, q.put('test'))

注意 asyncio.Queue.put() 是协程,但 asyncio.Queue.put_nowait() 是普通函数。