如何将 python-trio 与 google 协议缓冲区一起使用?
How to use python-trio with google protocol buffer?
我正在尝试使用 python 中的 protobuf 读取一些数据流,并且我想使用 trio 来制作用于读取流的客户端。 protobuf 有一些方法调用,我发现当我使用 trio streams 时它们不起作用。
Python 客户端在 linux 机器上。
import DTCProtocol_pb2 as Dtc
async def parent(addr, encoding, heartbeat_interval):
print(f"parent: connecting to 127.0.0.1:{addr[1]}")
client_stream = await trio.open_tcp_stream(addr[0], addr[1])
# encoding request
print("parent: spawing encoding request ...")
enc_req = create_enc_req(encoding) # construct encoding request
await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request
log.debug('get_reponse: started')
response = await client_stream.receive_some(1024)
m_size = struct.unpack_from('<H', response[:2]) # the size of message
m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
m_body = response[4:]
m_resp = Dtc.EncodingResponse()
m_body
将是一些字节数据,我不知道如何解码。 Dtc.EncodingResponse()
是 protobuf 方法,它会给出一个包含可读格式响应的 Dtc 对象。 (dtc 是 protobuf 文件)。但我在这里一无所获。当我在没有三重奏的情况下编写此脚本时,Dtc.EncodingResponse()
会以可读的格式给出完整的响应。
我猜问题是 "client_stream" 是一个只读取字节的三重流对象,所以我可能需要使用 ReceiveChannel
对象来代替。但如果这是真的,我不知道该怎么做。
更新:
Nathaniel J. Smith 的以下回答解决了我的问题。
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
我觉得好傻,但是我之前没有ParseFromString数据,就这样了。非常感谢所有给予答复的人。希望这可以帮助那里的人。
就像@shmee 在评论中所说的那样,我认为你的代码被编辑破坏了一些......你应该 double-check.
When I did this script without trio, Dtc.EncodingResponse()
would give the full response in readable format
我想你可能在切换到 Trio 时掉线了? Dtc.EncodingResponse()
只是创建一个新的空 EncodingResponse
对象。如果你想将 m_body
中的数据解析到你的新对象中,你必须明确地这样做,比如:
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
然而,还有另一个问题...它被称为 receive_some
的原因是它接收 some 字节,但可能接收不到 all 你要求的字节数。您的代码假设对 receive_some
的一次调用将获取响应中的所有字节,当您进行简单测试时这可能是正确的,但通常不能保证。如果您在第一次调用 receive_some
时没有获得足够的数据,您可能需要不断重复调用它,直到获得所有数据。
这实际上是非常标准的...套接字的工作方式相同。这就是为什么您的服务器首先在开头发送一个 m_size
字段的原因 – 这样您就可以判断您是否已获取所有数据!
不幸的是,截至 2019 年 6 月,Trio 没有提供帮手来为您执行此循环——您可以在 this issue 中跟踪这方面的进展。同时,您可以自己编写。我认为这样的事情应该可行:
async def receive_exactly(stream, count):
buf = bytearray()
while len(buf) < count:
new_data = await stream.receive_some(count - len(buf))
if not new_data:
raise RuntimeError("other side closed the connection unexpectedly")
buf += new data
return buf
async def receive_encoding_response(stream):
header = await receive_exactly(stream, 4)
(m_size, m_type) = struct.unpack('<HH', header)
m_body = await receive_exactly(stream, m_size)
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_size)
return m_resp
我正在尝试使用 python 中的 protobuf 读取一些数据流,并且我想使用 trio 来制作用于读取流的客户端。 protobuf 有一些方法调用,我发现当我使用 trio streams 时它们不起作用。
Python 客户端在 linux 机器上。
import DTCProtocol_pb2 as Dtc
async def parent(addr, encoding, heartbeat_interval):
print(f"parent: connecting to 127.0.0.1:{addr[1]}")
client_stream = await trio.open_tcp_stream(addr[0], addr[1])
# encoding request
print("parent: spawing encoding request ...")
enc_req = create_enc_req(encoding) # construct encoding request
await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request
log.debug('get_reponse: started')
response = await client_stream.receive_some(1024)
m_size = struct.unpack_from('<H', response[:2]) # the size of message
m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
m_body = response[4:]
m_resp = Dtc.EncodingResponse()
m_body
将是一些字节数据,我不知道如何解码。 Dtc.EncodingResponse()
是 protobuf 方法,它会给出一个包含可读格式响应的 Dtc 对象。 (dtc 是 protobuf 文件)。但我在这里一无所获。当我在没有三重奏的情况下编写此脚本时,Dtc.EncodingResponse()
会以可读的格式给出完整的响应。
我猜问题是 "client_stream" 是一个只读取字节的三重流对象,所以我可能需要使用 ReceiveChannel
对象来代替。但如果这是真的,我不知道该怎么做。
更新: Nathaniel J. Smith 的以下回答解决了我的问题。
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
我觉得好傻,但是我之前没有ParseFromString数据,就这样了。非常感谢所有给予答复的人。希望这可以帮助那里的人。
就像@shmee 在评论中所说的那样,我认为你的代码被编辑破坏了一些......你应该 double-check.
When I did this script without trio,
Dtc.EncodingResponse()
would give the full response in readable format
我想你可能在切换到 Trio 时掉线了? Dtc.EncodingResponse()
只是创建一个新的空 EncodingResponse
对象。如果你想将 m_body
中的数据解析到你的新对象中,你必须明确地这样做,比如:
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)
然而,还有另一个问题...它被称为 receive_some
的原因是它接收 some 字节,但可能接收不到 all 你要求的字节数。您的代码假设对 receive_some
的一次调用将获取响应中的所有字节,当您进行简单测试时这可能是正确的,但通常不能保证。如果您在第一次调用 receive_some
时没有获得足够的数据,您可能需要不断重复调用它,直到获得所有数据。
这实际上是非常标准的...套接字的工作方式相同。这就是为什么您的服务器首先在开头发送一个 m_size
字段的原因 – 这样您就可以判断您是否已获取所有数据!
不幸的是,截至 2019 年 6 月,Trio 没有提供帮手来为您执行此循环——您可以在 this issue 中跟踪这方面的进展。同时,您可以自己编写。我认为这样的事情应该可行:
async def receive_exactly(stream, count):
buf = bytearray()
while len(buf) < count:
new_data = await stream.receive_some(count - len(buf))
if not new_data:
raise RuntimeError("other side closed the connection unexpectedly")
buf += new data
return buf
async def receive_encoding_response(stream):
header = await receive_exactly(stream, 4)
(m_size, m_type) = struct.unpack('<HH', header)
m_body = await receive_exactly(stream, m_size)
m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_size)
return m_resp