如何处理同一个 RabbitMQ 队列中的多个 protobuf 消息?
How to handle multiple protobuff messages in same RabbitMQ queue?
我的问题是我正在使用单队列(作为我的服务的入口点)并使用 Go 消费者来处理传入的消息。
我的消费者
message := pb.GetRequest{}
err := proto.Unmarshal(msg.Body, message)
我的问题是我的消费者很难只处理 GetRequests。如果我需要处理其他类型的消息,即。添加请求
我需要为每条消息定义一个新队列或者
我要看看第一个不武(GetRequest),继续测试能不能不武到(AddRequest)
还有其他好的方法吗(前提是#1 不是一个好的选择)
如果您的使用者只能处理路由到他使用的队列的部分消息,并且无法扩展使用者以处理不同类型的消息,您将不得不阻止消息到达队列首先。这是 RabbitMQ 服务器和可能的生产者的工作。
您没有提供足够的信息让我们建议如何配置 RabbitMQ 交换、队列和绑定。也许消息携带一些头信息,使 RabbitMQ 服务器能够区分不同类型的消息。如果没有这样的信息,也许可以扩展消息生产者来添加这样的头信息。
简单地拒绝 (NACK) 您的消费者无法处理的消息是个坏主意。这只会将消息放回同一个队列。如果没有其他消费者可以处理,这条消息永远不会被消费成功(ACK)。
在 RabbitMQ 路由键上使用 switch
。
Channel.Consume
方法 returns 类型 <-chan amqp.Delivery
的 Go 通道,其中 amqp.Delivery
包含字段 RoutingKey
.
路由键是用于将发布的消息与消费者订阅相匹配的标识符。您应该确保您的发布者在路由键和消息类型之间保持一对一的关联。
发布商代码如下所示:
msg := &pb.AddRequest{} // some protobuf generated type
body, _ := proto.Marshal(msg)
err := ch.Publish(
"my-exchange", // exchange name
"foo.bar.add", // routing key
true, // option: mandatory
true, // option: immediate
amqp.Publishing{
ContentType: "application/x-protobuf",
Body: body,
},
)
在上面的示例中,您必须确保 所有且仅 类型 *pb.AddRequest
的消息使用路由密钥 foo.bar.add
发布,即您的消息类型是确定性的。
如果你能做到这一点,那么你的消费者代码就可以打开路由密钥并将 MQ 负载解组为正确类型的变量:
func formatEvent(payload amqp.Delivery) (proto.Message, error) {
var event proto.Message
// switch on the routing key
switch payload.RoutingKey {
case "foo.bar.add":
event = &pb.AddRequest{}
case "foo.bar.get":
event = &pb.GetRequest{}
default:
return nil, fmt.Errorf("unknown routingKey: %s", key)
}
// unmarshal the body into the event variable
if err := proto.Unmarshal(payload.Body, event); err != nil {
return nil, err
}
return event, nil
}
然后您可以 type-switch 在 proto.Message
实例上处理每个具体的消息类型。 (当然你也可以直接在routing key switch中处理具体的消息,这就看你想怎么组织你的代码了)
我的问题是我正在使用单队列(作为我的服务的入口点)并使用 Go 消费者来处理传入的消息。
我的消费者
message := pb.GetRequest{}
err := proto.Unmarshal(msg.Body, message)
我的问题是我的消费者很难只处理 GetRequests。如果我需要处理其他类型的消息,即。添加请求
我需要为每条消息定义一个新队列或者
我要看看第一个不武(GetRequest),继续测试能不能不武到(AddRequest)
还有其他好的方法吗(前提是#1 不是一个好的选择)
如果您的使用者只能处理路由到他使用的队列的部分消息,并且无法扩展使用者以处理不同类型的消息,您将不得不阻止消息到达队列首先。这是 RabbitMQ 服务器和可能的生产者的工作。
您没有提供足够的信息让我们建议如何配置 RabbitMQ 交换、队列和绑定。也许消息携带一些头信息,使 RabbitMQ 服务器能够区分不同类型的消息。如果没有这样的信息,也许可以扩展消息生产者来添加这样的头信息。
简单地拒绝 (NACK) 您的消费者无法处理的消息是个坏主意。这只会将消息放回同一个队列。如果没有其他消费者可以处理,这条消息永远不会被消费成功(ACK)。
在 RabbitMQ 路由键上使用 switch
。
Channel.Consume
方法 returns 类型 <-chan amqp.Delivery
的 Go 通道,其中 amqp.Delivery
包含字段 RoutingKey
.
路由键是用于将发布的消息与消费者订阅相匹配的标识符。您应该确保您的发布者在路由键和消息类型之间保持一对一的关联。
发布商代码如下所示:
msg := &pb.AddRequest{} // some protobuf generated type
body, _ := proto.Marshal(msg)
err := ch.Publish(
"my-exchange", // exchange name
"foo.bar.add", // routing key
true, // option: mandatory
true, // option: immediate
amqp.Publishing{
ContentType: "application/x-protobuf",
Body: body,
},
)
在上面的示例中,您必须确保 所有且仅 类型 *pb.AddRequest
的消息使用路由密钥 foo.bar.add
发布,即您的消息类型是确定性的。
如果你能做到这一点,那么你的消费者代码就可以打开路由密钥并将 MQ 负载解组为正确类型的变量:
func formatEvent(payload amqp.Delivery) (proto.Message, error) {
var event proto.Message
// switch on the routing key
switch payload.RoutingKey {
case "foo.bar.add":
event = &pb.AddRequest{}
case "foo.bar.get":
event = &pb.GetRequest{}
default:
return nil, fmt.Errorf("unknown routingKey: %s", key)
}
// unmarshal the body into the event variable
if err := proto.Unmarshal(payload.Body, event); err != nil {
return nil, err
}
return event, nil
}
然后您可以 type-switch 在 proto.Message
实例上处理每个具体的消息类型。 (当然你也可以直接在routing key switch中处理具体的消息,这就看你想怎么组织你的代码了)