通过 Cloud PubSub 发布时反序列化 Protocol Buffer 消息时出错

Error Deserialising Protocol Buffer messages when publishing through Cloud PubSub

我正在尝试通过 Cloud Pub Sub 发布 protobuf 消息,我遇到的问题是将 protobuf 消息与需要通过 pubsub 发送的字节字符串进行序列化和反序列化。

下面是我的代码的相关部分,我将一个 Protobuf 对象传递给 publish_price,从那里你可以看到我尝试在序列化后发布消息。在回调方法中返回结果后,我能做的问题出现在反序列化端。

 # # Publish price
def publish_price(price_obj):
    new_price_obj = price_obj.SerializeToString()
    

    future = publisher.publish(topic_path, new_price_obj)
    future.add_done_callback(get_callback(future, new_price_obj))

def get_callback(f, data):
        def callback(f):
            try:
                print(str(f.result(), 'utf-8'))
                # print(f.result())
                
            except:  # noqa
                print(data.FromString())
                print("Please handle {} for {}.".format(f.exception(), data))

        return callback

下面是相关的错误信息

Exception in thread Thread-CommitBatchPublisher:
[server] Traceback (most recent call last):
[server]   File "/app/app.py", line 39, in callback
[server]     print(str(f.result(), 'utf-8'))
[server] TypeError: decoding str is not supported
[server]
[server] During handling of the above exception, another exception occurred:
[server]
[server] Traceback (most recent call last):
[server]   File "/usr/local/lib/python3.9/threading.py", line 954, in _bootstrap_inner
[server]     self.run()
[server]   File "/usr/local/lib/python3.9/threading.py", line 892, in run
[server]     self._target(*self._args, **self._kwargs)
[server]   File "/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py", line 292, in _commit
[server]     future.set_result(message_id)
[server]   File "/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/futures.py", line 159, in set_result
[server]     self._trigger()
[server]   File "/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/futures.py", line 186, in _trigger
[server]     callback(self)
[server]   File "/app/app.py", line 43, in callback
[server]     print(data.ParseFromString())
[server] AttributeError: 'bytes' object has no attribute 'ParseFromString'

看来 data 是您要解析的字节串对象。 bytes 是来自 Python 标准库的 class,它不了解协议缓冲区,也没有 ParseFromString 方法。这是 protobuf classes 拥有的方法。

FromString 是在 proto class 中声明的方法。假设 ResponseProto 是你要解析的类型:

def parse_response(data: bytes):
    response = ResponseProto.FromString(data)
    return response