如何为异步创建自定义传输?
How to create custom transport for asyncio?
我正在阅读 asyncio
包中 Protocol
和 Transport
类 的文档。具体来说:
When subclassing a protocol class, it is recommended you override certain methods. Those methods are callbacks: they will be called by the transport on certain events (for example when some data is received); you shouldn’t call them yourself, unless you are implementing a transport.
已强调
所以,原则上,应该可以实现传输,但是...
Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method (which one?) which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.
再次强调
仔细阅读关于 AbstractEventLoop 的部分,我没有看到任何创建自定义传输的方法。它最接近的是 AbstractEventLoop.create_connection(protocol_factory, host=...)
,这仅意味着它将创建某种套接字...
好吧,我的最终目标是使用自定义传输,它不是任何类型的套接字(可能是 StringIO
,可能是数据库连接游标,可能是另一种协议)。
那么,这只是一个善意但从未在文档中实施的错误,还是实际上有一种方法可以在不使用猴子修补 asyncio
和牺牲长子的情况下实施自定义传输?
需要一些样板代码,但不会太多:
from asyncio import streams, transports, get_event_loop
class CustomTransport(transports.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
async def custom_create_connection(protocol_factory, *args, **kwargs):
loop = get_event_loop()
protocol = protocol_factory()
transport = CustomTransport(loop, protocol, *args, **kwargs)
return transport, protocol
async def custom_open_connection(*args, **kwargs):
reader = streams.StreamReader()
protocol = streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await custom_create_connection(factory, *args, **kwargs)
writer = streams.StreamWriter(transport, protocol, reader)
return reader, writer
我正在阅读 asyncio
包中 Protocol
和 Transport
类 的文档。具体来说:
When subclassing a protocol class, it is recommended you override certain methods. Those methods are callbacks: they will be called by the transport on certain events (for example when some data is received); you shouldn’t call them yourself, unless you are implementing a transport.
已强调
所以,原则上,应该可以实现传输,但是...
Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method (which one?) which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.
再次强调
仔细阅读关于 AbstractEventLoop 的部分,我没有看到任何创建自定义传输的方法。它最接近的是 AbstractEventLoop.create_connection(protocol_factory, host=...)
,这仅意味着它将创建某种套接字...
好吧,我的最终目标是使用自定义传输,它不是任何类型的套接字(可能是 StringIO
,可能是数据库连接游标,可能是另一种协议)。
那么,这只是一个善意但从未在文档中实施的错误,还是实际上有一种方法可以在不使用猴子修补 asyncio
和牺牲长子的情况下实施自定义传输?
需要一些样板代码,但不会太多:
from asyncio import streams, transports, get_event_loop
class CustomTransport(transports.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
async def custom_create_connection(protocol_factory, *args, **kwargs):
loop = get_event_loop()
protocol = protocol_factory()
transport = CustomTransport(loop, protocol, *args, **kwargs)
return transport, protocol
async def custom_open_connection(*args, **kwargs):
reader = streams.StreamReader()
protocol = streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await custom_create_connection(factory, *args, **kwargs)
writer = streams.StreamWriter(transport, protocol, reader)
return reader, writer