如何将 python-trio 与 google 协议缓冲区一起使用?

How to use python-trio with google protocol buffer?

我正在尝试使用 python 中的 protobuf 读取一些数据流,并且我想使用 trio 来制作用于读取流的客户端。 protobuf 有一些方法调用,我发现当我使用 trio streams 时它们不起作用。

Python 客户端在 linux 机器上。

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body 将是一些字节数据,我不知道如何解码。 Dtc.EncodingResponse() 是 protobuf 方法,它会给出一个包含可读格式响应的 Dtc 对象。 (dtc 是 protobuf 文件)。但我在这里一无所获。当我在没有三重奏的情况下编写此脚本时,Dtc.EncodingResponse() 会以可读的格式给出完整的响应。

我猜问题是 "client_stream" 是一个只读取字节的三重流对象,所以我可能需要使用 ReceiveChannel 对象来代替。但如果这是真的,我不知道该怎么做。

更新: Nathaniel J. Smith 的以下回答解决了我的问题。

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

我觉得好傻,但是我之前没有ParseFromString数据,就这样了。非常感谢所有给予答复的人。希望这可以帮助那里的人。

就像@shmee 在评论中所说的那样,我认为你的代码被编辑破坏了一些......你应该 double-check.

When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format

我想你可能在切换到 Trio 时掉线了? Dtc.EncodingResponse() 只是创建一个新的空 EncodingResponse 对象。如果你想将 m_body 中的数据解析到你的新对象中,你必须明确地这样做,比如:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

然而,还有另一个问题...它被称为 receive_some 的原因是它接收 some 字节,但可能接收不到 all 你要求的字节数。您的代码假设对 receive_some 的一次调用将获取响应中的所有字节,当您进行简单测试时这可能是正确的,但通常不能保证。如果您在第一次调用 receive_some 时没有获得足够的数据,您可能需要不断重复调用它,直到获得所有数据。

这实际上是非常标准的...套接字的工作方式相同。这就是为什么您的服务器首先在开头发送一个 m_size 字段的原因 – 这样您就可以判断您是否已获取所有数据!

不幸的是,截至 2019 年 6 月,Trio 没有提供帮手来为您执行此循环——您可以在 this issue 中跟踪这方面的进展。同时,您可以自己编写。我认为这样的事情应该可行:

async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp