如何包装自定义 future 以在 Python 中与 asyncio 一起使用?

How to wrap custom future to use with asyncio in Python?

有很多库使用他们自定义的 Future. kafka and s3transfer 版本,这只是两个例子:他们所有自定义的类似未来的 类 都有 object 作为超类。

毫不奇怪,您不能直接对此类对象调用 asyncio.wrap_future(),也不能对它们使用 await

包装此类期货以与 asyncio 一起使用的正确方法是什么?

如果未来 class 支持标准的未来功能,例如 done 回调和 result 方法,只需使用如下内容:

def wrap_future(f):
    loop = asyncio.get_event_loop()
    aio_future = loop.create_future()
    def on_done(*_):
        try:
            result = f.result()
        except Exception as e:
            loop.call_soon_threadsafe(aio_future.set_exception, e)
        else:
            loop.call_soon_threadsafe(aio_future.set_result, result)
    f.add_done_callback(on_done)
    return aio_future

将该代码视为一个模板,您可以对其进行自定义以匹配您正在处理的未来的具体情况。

预期用途是从运行异步事件循环的线程调用它:

value = await wrap_future(some_foreign_future)

如果您从不同的线程调用它,请务必明确传递 loop,因为当从未注册 asyncio 的线程调用时 asyncio.get_event_loop 会失败。