在来自不同线程的回调中设置 asyncio.Future 的值
Setting asyncio.Future's value in a callback from different thread
我有一个库,可以让我选择在外部作业完成时安排回调。 Future.set_result()
使用此回调是否安全?如果不是,实现这一目标的正确方法是什么? Future 的文档说它的方法不是线程安全的,所以我认为这可能有问题。
我的目标是使用 PyOpenCL 中的 OpenCL 事件作为异步代码中的可等待对象。我在考虑这样的辅助函数:
def wrap_opencl_event(event):
f = asyncio.Future()
event.set_callback(pyopencl.command_execution_status.COMPLETE, lambda x: f.set_result(None))
return f
并以这种方式使用它:
async def do_slow_stuff():
ev1 = pyopencl.enqueue_something()
ev2 = pyopencl.enqueue_something_else(wait_for=[ev1])
await wrap_opencl_event(ev2)
process_results()
在更彻底地阅读文档后,似乎应该在事件循环安排的回调中设置 future 的值:
def wrap_opencl_event(event):
loop = asyncio.get_event_loop()
f = loop.create_future()
event.set_callback(pyopencl.command_execution_status.COMPLETE,
lambda status: loop.call_soon_threadsafe(f.set_result, None))
return f
另一种选择是使用 concurrent.futures 并包装 future
import asyncio.futures, concurrent.futures
async def func_returning_future(loop):
fut = concurrent.futures.Future()
# Pass off to your other thread somehow
# and return a wrapped future for asyncio
return asyncio.futures.wrap_future(fut, loop = loop)
或者由于该函数是异步定义,您甚至可以在循环中等待它,将最后一行替换为
await asyncio.futures.wrap_future(fut, loop = loop)
我有一个库,可以让我选择在外部作业完成时安排回调。 Future.set_result()
使用此回调是否安全?如果不是,实现这一目标的正确方法是什么? Future 的文档说它的方法不是线程安全的,所以我认为这可能有问题。
我的目标是使用 PyOpenCL 中的 OpenCL 事件作为异步代码中的可等待对象。我在考虑这样的辅助函数:
def wrap_opencl_event(event):
f = asyncio.Future()
event.set_callback(pyopencl.command_execution_status.COMPLETE, lambda x: f.set_result(None))
return f
并以这种方式使用它:
async def do_slow_stuff():
ev1 = pyopencl.enqueue_something()
ev2 = pyopencl.enqueue_something_else(wait_for=[ev1])
await wrap_opencl_event(ev2)
process_results()
在更彻底地阅读文档后,似乎应该在事件循环安排的回调中设置 future 的值:
def wrap_opencl_event(event):
loop = asyncio.get_event_loop()
f = loop.create_future()
event.set_callback(pyopencl.command_execution_status.COMPLETE,
lambda status: loop.call_soon_threadsafe(f.set_result, None))
return f
另一种选择是使用 concurrent.futures 并包装 future
import asyncio.futures, concurrent.futures
async def func_returning_future(loop):
fut = concurrent.futures.Future()
# Pass off to your other thread somehow
# and return a wrapped future for asyncio
return asyncio.futures.wrap_future(fut, loop = loop)
或者由于该函数是异步定义,您甚至可以在循环中等待它,将最后一行替换为
await asyncio.futures.wrap_future(fut, loop = loop)