如何包装自定义 future 以在 Python 中与 asyncio 一起使用?
How to wrap custom future to use with asyncio in Python?
有很多库使用他们自定义的 Future. kafka and s3transfer 版本,这只是两个例子:他们所有自定义的类似未来的 类 都有 object
作为超类。
毫不奇怪,您不能直接对此类对象调用 asyncio.wrap_future()
,也不能对它们使用 await
。
包装此类期货以与 asyncio 一起使用的正确方法是什么?
如果未来 class 支持标准的未来功能,例如 done 回调和 result
方法,只需使用如下内容:
def wrap_future(f):
loop = asyncio.get_event_loop()
aio_future = loop.create_future()
def on_done(*_):
try:
result = f.result()
except Exception as e:
loop.call_soon_threadsafe(aio_future.set_exception, e)
else:
loop.call_soon_threadsafe(aio_future.set_result, result)
f.add_done_callback(on_done)
return aio_future
将该代码视为一个模板,您可以对其进行自定义以匹配您正在处理的未来的具体情况。
预期用途是从运行异步事件循环的线程调用它:
value = await wrap_future(some_foreign_future)
如果您从不同的线程调用它,请务必明确传递 loop
,因为当从未注册 asyncio 的线程调用时 asyncio.get_event_loop
会失败。
有很多库使用他们自定义的 Future. kafka and s3transfer 版本,这只是两个例子:他们所有自定义的类似未来的 类 都有 object
作为超类。
毫不奇怪,您不能直接对此类对象调用 asyncio.wrap_future()
,也不能对它们使用 await
。
包装此类期货以与 asyncio 一起使用的正确方法是什么?
如果未来 class 支持标准的未来功能,例如 done 回调和 result
方法,只需使用如下内容:
def wrap_future(f):
loop = asyncio.get_event_loop()
aio_future = loop.create_future()
def on_done(*_):
try:
result = f.result()
except Exception as e:
loop.call_soon_threadsafe(aio_future.set_exception, e)
else:
loop.call_soon_threadsafe(aio_future.set_result, result)
f.add_done_callback(on_done)
return aio_future
将该代码视为一个模板,您可以对其进行自定义以匹配您正在处理的未来的具体情况。
预期用途是从运行异步事件循环的线程调用它:
value = await wrap_future(some_foreign_future)
如果您从不同的线程调用它,请务必明确传递 loop
,因为当从未注册 asyncio 的线程调用时 asyncio.get_event_loop
会失败。