RabbitMQ Pika 乱码消息体

RabbitMQ Pika jumbled message body

我正在尝试在 Python 中使用 RabbitMQ 创建一个简单的文件下载和上传服务。我制作了服务器和客户端脚本,并定义了一些对象,这些对象将请求序列化和反序列化为 json,这样我就可以发送文件 uid 或其他参数以及二进制数据(使用 base64 编码)。发送小文件时一切正常,但当我开始发送更大的文件时(使用 1.4mb 文本文件测试),我开始收到 JSONDecode 错误。我正在将发送和接收的消息转储到文件中,看起来发送的消息格式正确 json。但是,收到的信息好像是乱码,例如:

发送:{"uid":"test",Rhc2Rhc2Rhc2Rhc2Rhc2Rhc2==”}

收到:Rhc2Rhc2{"uid":"test",Rhc2Rhc2Rhc2Rhc2==”}

显然乱块比那个大得多。有人 运行 以前遇到过这个问题吗?

这里有一些代码片段,我可以上传完整的代码,但不是很整洁:

上传请求:

class FileSystemUploadRequest:

def __init__(self, uid, data):
    self.uid = uid
    self.data = data

def encode(self):
    dicc = {"uid": self.uid, "data": base64.b64encode(self.data).decode()}
    return json.dumps(dicc)

@staticmethod
def decode(jsonstr):
    dicc = json.loads(jsonstr)
    uid = dicc["uid"]
    data = base64.b64decode(dicc["data"])
    return FileSystemUploadRequest(uid, data)

客户(发件人):

def put_file_blocking(self, uid, data):

    print(" [x] Llamaron a put_file_blocking")

    corr_id = str(uuid.uuid4())
    request = FileSystemUploadRequest(uid, data)

    f = open("dump", "w")
    f.write(request.encode())
    f.close()

    # Send upload request
    self.channel.basic_publish(exchange='',
                               routing_key=self.queue_upload,
                               properties=pika.BasicProperties(
                                   reply_to=self.callback_queue_name,
                                   correlation_id=corr_id
                               ),
                               body=request.encode())

服务器(接收方):

def upload_request(ch, method, props, body):

f = open("_dump", "w")
f.write(body.decode())
f.close()

# Get Upload Request
request = FileSystemUploadRequest.decode(body)
print(" [x] Received upload request for: " + request.uid)
filename = fs_dir + "/" + request.uid

# Do upload
f = open(filename, "wb")
f.write(request.data)
f.close()

# Create Upload Response
status = Status.OK
response = FileSystemUploadResponse(status)

# Send response
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id=props.correlation_id),
                 body=response.encode())
ch.basic_ack(delivery_tag=method.delivery_tag)

print(" [x] Finished upload request for: " + request.uid)

RabbitMQ 是一个消息代理,而不是一个文件存储系统。由于假设消息的大小有所限制,因此进行了多项优化,这些优化将适用于您的用例。

要在分布式系统中共享文件,您应该依赖诸如 OpenStack Swift or AWS S3 之类的对象存储。

您仍然可以使用 RabbitMQ 通知新文件的存在,但不是将其嵌入到消息中,而是提供对象存储中的位置,消费者从那里检索文件。

几个参考链接:

RabbitMQ best practices

Can RabbitMQ handle big messages?