是否有书籍或教程显示如何正确使用 asyncio 的协议?
Is there a book or a tutorial which shows how to correctly use asyncio's Protocol?
有没有书籍或教程介绍如何正确使用asyncio的协议? Web 上的所有示例都将 IO 直接混合到协议定义中!
我想编写一个解析器来进行帧解码并将消息转换为 python 数据结构。解析此数据结构后,我想将其传递给客户端。
[ ]-->[*protocol parser*]-->[high level api]-->[ ]
[network] [client code]
[ ]<--[*protocol parser*]<--[high level api]<--[ ]
相应地,更高级别API的客户端传入一个python数据结构,高级API将该数据结构传递给我的protocl,后者将其转换到正确的 byte/text 表示并将其传递到传输层。
我假设这是首先抽象出协议 class 的目的。我不想从协议中响应连接的另一端,但这是大多数网络教程所展示的!
此外,我想了解 python 世界中提供了哪些高级接口,是回调、流接口还是其他?
I want to write a parser which does frame decoding and converts the message to a python data structure. Once this data structure is parsed, I want to pass this on to the client.
[ ]-->[*protocol parser*]-->[high level api]-->[ ]
[network] [client code]
[ ]<--[*protocol parser*]<--[high level api]<--[ ]
Correspondingly, the client of the higher level API passes in a python data structure, the high level API feeds that data structure passes it to my protocl, which converts it to the correct byte/text representation and passes it on to the transport layer.
您可以从两个不同的角度来处理协议实现:
使用低级asyncio.Protocol
。为此,我们有两个 API:loop.create_server
和 loop.create_connection
。前者用于创建可暴露于网络并接受客户端连接的服务器(例如 HTTP 服务器)。后者可用于实现客户端(例如 API 客户端、数据库驱动程序等) .
Protocol
的核心思想很简单:它可以实现一个connection_made()
方法,一旦连接被loop.create_server
或loop.create_connection
调用,制成。该协议将接收 Transport
对象的实例,它可用于将数据发送回客户端。
它还可以实现data_received()
方法,当有传入数据要处理时,事件循环将调用该方法。一般的方法是为您正在实现的协议编写一个缓冲区抽象,它可以解析数据。一旦缓冲区有足够的数据来解析和处理,您可以将结果放入 asyncio.Queue
或安排一些异步任务。例如:
class MyProtocolBuffer:
"""Primitive protocol parser"""
def __init__(self):
self.buf = b'' # for real code use bytearray
# or list of memoryviews
def feed_data(data):
self.buf += data
def has_complete_message(self):
# Implement your parsing logic here...
def read_message(self):
# ...and here.
class MyProtocol:
def __init__(self, loop, queue: asyncio.Queue):
self.buffer = MyProtocolReadBuffer()
def data_received(self, data):
self.buffer.feed_data(data)
while self.buffer.has_complete_message():
message = self.buffer.read_message()
queue.put_nowait(message)
async def main(host, port):
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
transport, protocol = await loop.create_connection(
lambda: MyProtocol(loop, queue),
host, port)
try:
while True:
message = await queue.get()
# This is where you implement your "high level api"
# or even "client code".
print(f'received message {message!r}')
finally:
transport.close()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(host, port))
finally:
loop.close()
请注意,这是一个低级异步 API。 它旨在供框架和库作者使用。例如,asyncpg,一个高性能的 asyncio PostgreSQL 驱动程序使用这些 APIs.
您可以在此处阅读有关协议的更多信息:https://docs.python.org/3/library/asyncio-protocol.html#protocols. A good example of successfully using these APIs is the asyncpg library: https://github.com/magicstack/asyncpg。
使用高级异步流。 Streams 允许您使用 async/await 语法实现协议。两个主要的 API:asyncio.open_connection()
和 asyncio.start_server()
。让我们使用 open_connection()
:
重新实现上面的例子
async def main():
reader, writer = await asyncio.open_connection(host, port)
message_line = await reader.readline()
# Implement the rest of protocol parser using async/await
# and `reader.readline()`, `reader.readuntil()`, and
# `reader.readexactly()` methods.
# Once you have your message you can put in an asyncio.Queue,
# or spawn asyncio tasks to process incoming messages.
# This is where you implement your "high level api"
# or even "client code".
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(host, port))
finally:
loop.close()
我建议您始终使用流来起草某些协议的第一个实现,特别是如果您不熟悉网络和异步。您可以使用低级 APIs 构建更快的代码,使用流的错误您将更快地获得工作程序并且代码库将更易于维护。理想情况下,您应该坚持使用 async/await 和高级异步 APIs.
有没有书籍或教程介绍如何正确使用asyncio的协议? Web 上的所有示例都将 IO 直接混合到协议定义中!
我想编写一个解析器来进行帧解码并将消息转换为 python 数据结构。解析此数据结构后,我想将其传递给客户端。
[ ]-->[*protocol parser*]-->[high level api]-->[ ]
[network] [client code]
[ ]<--[*protocol parser*]<--[high level api]<--[ ]
相应地,更高级别API的客户端传入一个python数据结构,高级API将该数据结构传递给我的protocl,后者将其转换到正确的 byte/text 表示并将其传递到传输层。
我假设这是首先抽象出协议 class 的目的。我不想从协议中响应连接的另一端,但这是大多数网络教程所展示的!
此外,我想了解 python 世界中提供了哪些高级接口,是回调、流接口还是其他?
I want to write a parser which does frame decoding and converts the message to a python data structure. Once this data structure is parsed, I want to pass this on to the client.
[ ]-->[*protocol parser*]-->[high level api]-->[ ] [network] [client code] [ ]<--[*protocol parser*]<--[high level api]<--[ ]
Correspondingly, the client of the higher level API passes in a python data structure, the high level API feeds that data structure passes it to my protocl, which converts it to the correct byte/text representation and passes it on to the transport layer.
您可以从两个不同的角度来处理协议实现:
使用低级
asyncio.Protocol
。为此,我们有两个 API:loop.create_server
和loop.create_connection
。前者用于创建可暴露于网络并接受客户端连接的服务器(例如 HTTP 服务器)。后者可用于实现客户端(例如 API 客户端、数据库驱动程序等) .Protocol
的核心思想很简单:它可以实现一个connection_made()
方法,一旦连接被loop.create_server
或loop.create_connection
调用,制成。该协议将接收Transport
对象的实例,它可用于将数据发送回客户端。它还可以实现
data_received()
方法,当有传入数据要处理时,事件循环将调用该方法。一般的方法是为您正在实现的协议编写一个缓冲区抽象,它可以解析数据。一旦缓冲区有足够的数据来解析和处理,您可以将结果放入asyncio.Queue
或安排一些异步任务。例如:class MyProtocolBuffer: """Primitive protocol parser""" def __init__(self): self.buf = b'' # for real code use bytearray # or list of memoryviews def feed_data(data): self.buf += data def has_complete_message(self): # Implement your parsing logic here... def read_message(self): # ...and here. class MyProtocol: def __init__(self, loop, queue: asyncio.Queue): self.buffer = MyProtocolReadBuffer() def data_received(self, data): self.buffer.feed_data(data) while self.buffer.has_complete_message(): message = self.buffer.read_message() queue.put_nowait(message) async def main(host, port): queue = asyncio.Queue() loop = asyncio.get_event_loop() transport, protocol = await loop.create_connection( lambda: MyProtocol(loop, queue), host, port) try: while True: message = await queue.get() # This is where you implement your "high level api" # or even "client code". print(f'received message {message!r}') finally: transport.close() loop = asyncio.get_event_loop() try: loop.run_until_complete(main(host, port)) finally: loop.close()
请注意,这是一个低级异步 API。 它旨在供框架和库作者使用。例如,asyncpg,一个高性能的 asyncio PostgreSQL 驱动程序使用这些 APIs.
您可以在此处阅读有关协议的更多信息:https://docs.python.org/3/library/asyncio-protocol.html#protocols. A good example of successfully using these APIs is the asyncpg library: https://github.com/magicstack/asyncpg。
使用高级异步流。 Streams 允许您使用 async/await 语法实现协议。两个主要的 API:
重新实现上面的例子asyncio.open_connection()
和asyncio.start_server()
。让我们使用open_connection()
:async def main(): reader, writer = await asyncio.open_connection(host, port) message_line = await reader.readline() # Implement the rest of protocol parser using async/await # and `reader.readline()`, `reader.readuntil()`, and # `reader.readexactly()` methods. # Once you have your message you can put in an asyncio.Queue, # or spawn asyncio tasks to process incoming messages. # This is where you implement your "high level api" # or even "client code". loop = asyncio.get_event_loop() try: loop.run_until_complete(main(host, port)) finally: loop.close()
我建议您始终使用流来起草某些协议的第一个实现,特别是如果您不熟悉网络和异步。您可以使用低级 APIs 构建更快的代码,使用流的错误您将更快地获得工作程序并且代码库将更易于维护。理想情况下,您应该坚持使用 async/await 和高级异步 APIs.