如何 运行 协程并在循环 运行ning 时等待同步函数的结果?
How to run a coroutine and wait it result from a sync func when the loop is running?
我有一个像傻瓜一样的代码:
def render():
loop = asyncio.get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
if loop.is_running():
result = asyncio.ensure_future(test())
else:
result = loop.run_until_complete(test())
当 loop
不是 运行ning 时很容易,只需使用 loop.run_until_complete
并且它 return coro 结果但如果循环已经 运行ning(我在应用程序中的阻塞代码 运行ning 已经 运行ning 循环)我不能使用 loop.run_until_complete
因为它会引发异常;当我调用 asyncio.ensure_future
时,任务已安排好 运行,但我想在那里等待结果,有人知道该怎么做吗?文档不是很清楚如何做到这一点。
我尝试在 coro 中传递一个 concurrent.futures.Future
调用 set_result
,然后在我的阻塞代码上调用 Future.result()
,但它不起作用,它阻塞在那里并且不让运行 的任何其他内容。任何帮助将不胜感激。
要使用提议的设计实施 runner
,您需要一种从其中的回调 运行 到 single-step 事件循环的方法。 Asyncio explicitly forbids 递归事件循环,所以这种方法是死胡同。
鉴于该限制,您有两个选择:
- 使
render()
本身成为协程;
- 在与运行 asyncio 事件循环的线程不同的线程中执行
render()
(及其调用者)。
假设 #1 是不可能的,你可以像这样实现 render()
的 #2 变体:
def render():
loop = _event_loop # can't call get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
future = asyncio.run_coroutine_threadsafe(test(), loop)
result = future.result()
请注意,您不能在 render
中使用 asyncio.get_event_loop()
,因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用 asyncio.get_event_loop()
并将其发送到线程,或者将其保留在全局变量或共享结构中。
同步等待异步协程
如果 asyncio 事件循环已经通过调用 loop.run_forever
被 运行 宁,它将 阻塞 执行线程直到 loop.stop
被调用[参见docs]。因此,同步等待的唯一方法是 运行 专用线程上的事件循环,在循环上调度 异步 函数并等待它 从另一个线程同步。
为此,我按照 by user4815162342. I have also added the parts for cleaning up the loop when all work is finished [see loop.close
] 编写了自己的最小解决方案。
下面代码中的main
函数运行是专用线程上的事件循环,在事件循环上调度多个任务,加上要等待其结果的任务同步。同步等待将阻塞,直到准备好所需的结果。最后,循环关闭并与其线程一起优雅地清理。
专用线程和函数stop_loop
、run_forever_safe
、await_sync
可以封装在模块或class.
中
有关线程安全的注意事项,请参阅异步文档中的“Concurrency and Multithreading”部分。
import asyncio
import threading
#----------------------------------------
def stop_loop(loop):
''' stops an event loop '''
loop.stop()
print (".: LOOP STOPPED:", loop.is_running())
def run_forever_safe(loop):
''' run a loop for ever and clean up after being stopped '''
loop.run_forever()
# NOTE: loop.run_forever returns after calling loop.stop
#-- cancell all tasks and close the loop gracefully
print(".: CLOSING LOOP...")
# source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>
loop_tasks_all = asyncio.Task.all_tasks(loop=loop)
for task in loop_tasks_all: task.cancel()
# NOTE: `cancel` does not guarantee that the Task will be cancelled
for task in loop_tasks_all:
if not (task.done() or task.cancelled()):
try:
# wait for task cancellations
loop.run_until_complete(task)
except asyncio.CancelledError: pass
#END for
print(".: ALL TASKS CANCELLED.")
loop.close()
print(".: LOOP CLOSED:", loop.is_closed())
def await_sync(task):
''' synchronously waits for a task '''
while not task.done(): pass
print(".: AWAITED TASK DONE")
return task.result()
#----------------------------------------
async def asyncTask(loop, k):
''' asynchronous task '''
print("--start async task %s" % k)
await asyncio.sleep(3, loop=loop)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def main():
loop = asyncio.new_event_loop() # construct a new event loop
#-- closures for running and stopping the event-loop
run_loop_forever = lambda: run_forever_safe(loop)
close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)
#-- make dedicated thread for running the event loop
thread = threading.Thread(target=run_loop_forever)
#-- add some tasks along with my particular task
myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
for i in range(1, 10)]
#-- begin the thread to run the event-loop
print(".: EVENT-LOOP THREAD START")
thread.start()
#-- _synchronously_ wait for the result of my task
result = await_sync(myTask) # blocks until task is done
print("* final result of my task:", result)
#... do lots of work ...
print("*** ALL WORK DONE ***")
#========================================
# close the loop gracefully when everything is finished
close_loop_safe()
thread.join()
#----------------------------------------
main()
这是我的情况,我的整个程序是异步的,但是调用一些同步库,然后回调到我的异步函数。
关注网友4815162342的回答
import asyncio
async def asyncTask(k):
''' asynchronous task '''
print("--start async task %s" % k)
# await asyncio.sleep(3, loop=loop)
await asyncio.sleep(3)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def my_callback():
print("here i want to call my async func!")
future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
return future.result()
def sync_third_lib(cb):
print("here will call back to your code...")
cb()
async def main():
print("main start...")
print("call sync third lib ...")
await asyncio.to_thread(sync_third_lib, my_callback)
# await loop.run_in_executor(None, func=sync_third_lib)
print("another work...keep async...")
await asyncio.sleep(2)
print("done!")
LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())
我有一个像傻瓜一样的代码:
def render():
loop = asyncio.get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
if loop.is_running():
result = asyncio.ensure_future(test())
else:
result = loop.run_until_complete(test())
当 loop
不是 运行ning 时很容易,只需使用 loop.run_until_complete
并且它 return coro 结果但如果循环已经 运行ning(我在应用程序中的阻塞代码 运行ning 已经 运行ning 循环)我不能使用 loop.run_until_complete
因为它会引发异常;当我调用 asyncio.ensure_future
时,任务已安排好 运行,但我想在那里等待结果,有人知道该怎么做吗?文档不是很清楚如何做到这一点。
我尝试在 coro 中传递一个 concurrent.futures.Future
调用 set_result
,然后在我的阻塞代码上调用 Future.result()
,但它不起作用,它阻塞在那里并且不让运行 的任何其他内容。任何帮助将不胜感激。
要使用提议的设计实施 runner
,您需要一种从其中的回调 运行 到 single-step 事件循环的方法。 Asyncio explicitly forbids 递归事件循环,所以这种方法是死胡同。
鉴于该限制,您有两个选择:
- 使
render()
本身成为协程; - 在与运行 asyncio 事件循环的线程不同的线程中执行
render()
(及其调用者)。
假设 #1 是不可能的,你可以像这样实现 render()
的 #2 变体:
def render():
loop = _event_loop # can't call get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
future = asyncio.run_coroutine_threadsafe(test(), loop)
result = future.result()
请注意,您不能在 render
中使用 asyncio.get_event_loop()
,因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用 asyncio.get_event_loop()
并将其发送到线程,或者将其保留在全局变量或共享结构中。
同步等待异步协程
如果 asyncio 事件循环已经通过调用 loop.run_forever
被 运行 宁,它将 阻塞 执行线程直到 loop.stop
被调用[参见docs]。因此,同步等待的唯一方法是 运行 专用线程上的事件循环,在循环上调度 异步 函数并等待它 从另一个线程同步。
为此,我按照 loop.close
] 编写了自己的最小解决方案。
下面代码中的main
函数运行是专用线程上的事件循环,在事件循环上调度多个任务,加上要等待其结果的任务同步。同步等待将阻塞,直到准备好所需的结果。最后,循环关闭并与其线程一起优雅地清理。
专用线程和函数stop_loop
、run_forever_safe
、await_sync
可以封装在模块或class.
有关线程安全的注意事项,请参阅异步文档中的“Concurrency and Multithreading”部分。
import asyncio
import threading
#----------------------------------------
def stop_loop(loop):
''' stops an event loop '''
loop.stop()
print (".: LOOP STOPPED:", loop.is_running())
def run_forever_safe(loop):
''' run a loop for ever and clean up after being stopped '''
loop.run_forever()
# NOTE: loop.run_forever returns after calling loop.stop
#-- cancell all tasks and close the loop gracefully
print(".: CLOSING LOOP...")
# source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>
loop_tasks_all = asyncio.Task.all_tasks(loop=loop)
for task in loop_tasks_all: task.cancel()
# NOTE: `cancel` does not guarantee that the Task will be cancelled
for task in loop_tasks_all:
if not (task.done() or task.cancelled()):
try:
# wait for task cancellations
loop.run_until_complete(task)
except asyncio.CancelledError: pass
#END for
print(".: ALL TASKS CANCELLED.")
loop.close()
print(".: LOOP CLOSED:", loop.is_closed())
def await_sync(task):
''' synchronously waits for a task '''
while not task.done(): pass
print(".: AWAITED TASK DONE")
return task.result()
#----------------------------------------
async def asyncTask(loop, k):
''' asynchronous task '''
print("--start async task %s" % k)
await asyncio.sleep(3, loop=loop)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def main():
loop = asyncio.new_event_loop() # construct a new event loop
#-- closures for running and stopping the event-loop
run_loop_forever = lambda: run_forever_safe(loop)
close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)
#-- make dedicated thread for running the event loop
thread = threading.Thread(target=run_loop_forever)
#-- add some tasks along with my particular task
myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
for i in range(1, 10)]
#-- begin the thread to run the event-loop
print(".: EVENT-LOOP THREAD START")
thread.start()
#-- _synchronously_ wait for the result of my task
result = await_sync(myTask) # blocks until task is done
print("* final result of my task:", result)
#... do lots of work ...
print("*** ALL WORK DONE ***")
#========================================
# close the loop gracefully when everything is finished
close_loop_safe()
thread.join()
#----------------------------------------
main()
这是我的情况,我的整个程序是异步的,但是调用一些同步库,然后回调到我的异步函数。
关注网友4815162342的回答
import asyncio
async def asyncTask(k):
''' asynchronous task '''
print("--start async task %s" % k)
# await asyncio.sleep(3, loop=loop)
await asyncio.sleep(3)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def my_callback():
print("here i want to call my async func!")
future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
return future.result()
def sync_third_lib(cb):
print("here will call back to your code...")
cb()
async def main():
print("main start...")
print("call sync third lib ...")
await asyncio.to_thread(sync_third_lib, my_callback)
# await loop.run_in_executor(None, func=sync_third_lib)
print("another work...keep async...")
await asyncio.sleep(2)
print("done!")
LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())