如何 运行 协程并在循环 运行ning 时等待同步函数的结果?

How to run a coroutine and wait it result from a sync func when the loop is running?

我有一个像傻瓜一样的代码:

def render():
    loop = asyncio.get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    if loop.is_running():
        result = asyncio.ensure_future(test())
    else:
        result = loop.run_until_complete(test())

loop 不是 运行ning 时很容易,只需使用 loop.run_until_complete 并且它 return coro 结果但如果循环已经 运行ning(我在应用程序中的阻塞代码 运行ning 已经 运行ning 循环)我不能使用 loop.run_until_complete 因为它会引发异常;当我调用 asyncio.ensure_future 时,任务已安排好 运行,但我想在那里等待结果,有人知道该怎么做吗?文档不是很清楚如何做到这一点。

我尝试在 coro 中传递一个 concurrent.futures.Future 调用 set_result,然后在我的阻塞代码上调用 Future.result(),但它不起作用,它阻塞在那里并且不让运行 的任何其他内容。任何帮助将不胜感激。

要使用提议的设计实施 runner,您需要一种从其中的回调 运行 到 single-step 事件循环的方法。 Asyncio explicitly forbids 递归事件循环,所以这种方法是死胡同。

鉴于该限制,您有两个选择:

  1. 使render()本身成为协程;
  2. 在与运行 asyncio 事件循环的线程不同的线程中执行 render()(及其调用者)。

假设 #1 是不可能的,你可以像这样实现 render() 的 #2 变体:

def render():
    loop = _event_loop  # can't call get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    future = asyncio.run_coroutine_threadsafe(test(), loop)
    result = future.result()

请注意,您不能在 render 中使用 asyncio.get_event_loop(),因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用 asyncio.get_event_loop() 并将其发送到线程,或者将其保留在全局变量或共享结构中。

同步等待异步协程

如果 asyncio 事件循环已经通过调用 loop.run_forever 被 运行 宁,它将 阻塞 执行线程直到 loop.stop 被调用[参见docs]。因此,同步等待的唯一方法是 运行 专用线程上的事件循环,在循环上调度 异步 函数并等待它 另一个线程同步

为此,我按照 by user4815162342. I have also added the parts for cleaning up the loop when all work is finished [see loop.close] 编写了自己的最小解决方案。

下面代码中的main函数运行是专用线程上的事件循环,在事件循环上调度多个任务,加上要等待其结果的任务同步。同步等待将阻塞,直到准备好所需的结果。最后,循环关闭并与其线程一起优雅地清理。

专用线程和函数stop_looprun_forever_safeawait_sync可以封装在模块或class.

有关线程安全的注意事项,请参阅异步文档中的“Concurrency and Multithreading”部分。

import asyncio
import threading
#----------------------------------------

def stop_loop(loop):
    ''' stops an event loop '''
    loop.stop()
    print (".: LOOP STOPPED:", loop.is_running())

def run_forever_safe(loop):
    ''' run a loop for ever and clean up after being stopped '''

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    #-- cancell all tasks and close the loop gracefully
    print(".: CLOSING LOOP...")
    # source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>

    loop_tasks_all = asyncio.Task.all_tasks(loop=loop)

    for task in loop_tasks_all: task.cancel()
    # NOTE: `cancel` does not guarantee that the Task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError: pass
    #END for
    print(".: ALL TASKS CANCELLED.")

    loop.close()
    print(".: LOOP CLOSED:", loop.is_closed())

def await_sync(task):
    ''' synchronously waits for a task '''
    while not task.done(): pass
    print(".: AWAITED TASK DONE")
    return task.result()
#----------------------------------------

async def asyncTask(loop, k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    await asyncio.sleep(3, loop=loop)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key

def main():
    loop = asyncio.new_event_loop() # construct a new event loop

    #-- closures for running and stopping the event-loop
    run_loop_forever = lambda: run_forever_safe(loop)
    close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)

    #-- make dedicated thread for running the event loop
    thread = threading.Thread(target=run_loop_forever)

    #-- add some tasks along with my particular task
    myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
    otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
                  for i in range(1, 10)]

    #-- begin the thread to run the event-loop
    print(".: EVENT-LOOP THREAD START")
    thread.start()

    #-- _synchronously_ wait for the result of my task
    result = await_sync(myTask) # blocks until task is done
    print("* final result of my task:", result) 

    #... do lots of work ...
    print("*** ALL WORK DONE ***")
    #========================================

    # close the loop gracefully when everything is finished
    close_loop_safe()
    thread.join()
#----------------------------------------

main()

这是我的情况,我的整个程序是异步的,但是调用一些同步库,然后回调到我的异步函数。

关注网友4815162342的回答

import asyncio

async def asyncTask(k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    # await asyncio.sleep(3, loop=loop)
    await asyncio.sleep(3)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key


def my_callback():
    print("here i want to call my async func!")
    future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
    return future.result()

def sync_third_lib(cb):
    print("here will call back to your code...")
    cb()

async def main():
    print("main start...")

    print("call sync third lib ...")
    await asyncio.to_thread(sync_third_lib, my_callback)
    # await loop.run_in_executor(None, func=sync_third_lib)
    print("another work...keep async...")
    await asyncio.sleep(2)

    print("done!")


LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())