Python 3: 在项目中重用代码的最佳方式,当需要向用户公开经典 + asyncio 接口时

Python 3: best way to reuse the code in project, when need to expose both classic + asyncio interfaces to user

该项目是Python 3 库(包),实现自定义流二进制协议。想到对流处理类:

MyEncodingWriter(dst_stream) - 将 Python 对象转换为原始字节流 MyDecodingReader(src_stream) - 将原始编码字节转换为消息或 Python 对象

我需要同时支持阻塞同步流和 asyncio StreamReader + StreamWriter,(可能是它们的子类)

我知道我不能直接使用 coroutines 作为常规同步功能。

我想模仿 asyncio reader/writer API 的 (read(), readexactly(),...) 在我的 类 MyEncodingWriter()MyDecodingReader() 中,唯一的区别是异步版本中的额外 async/await 关键字。

这对我来说很奇怪,我将拥有两份代码非常相似但我无法重复使用的副本或回购协议。

目前我只有这个想法... ...尝试在元类的帮助下从异步生成动态同步代码(通过剥离 async/wait 关键字),也许吧?

有没有更好的方法?

TYA 对于这样的双重实现的任何建议或漂亮 sampes/repos。

使阻塞版本成为异步版本的子类,并包装调用。未经测试,可能有语法错误,但大致如下:

class MyBlockingEncodingReader(MyEncodingReader):
    def readexactly(self, n):
        return asyncio.get_event_loop().run_until_complete(super().readexactly(n))