如何处理同一个 RabbitMQ 队列中的多个 protobuf 消息?

How to handle multiple protobuff messages in same RabbitMQ queue?

我的问题是我正在使用单队列(作为我的服务的入口点)并使用 Go 消费者来处理传入的消息。

我的消费者

    message := pb.GetRequest{}
    err := proto.Unmarshal(msg.Body, message)

我的问题是我的消费者很难只处理 GetRequests。如果我需要处理其他类型的消息,即。添加请求

  1. 我需要为每条消息定义一个新队列或者

  2. 我要看看第一个不武(GetRequest),继续测试能不能不武到(AddRequest)

还有其他好的方法吗(前提是#1 不是一个好的选择)

如果您的使用者只能处理路由到他使用的队列的部分消息,并且无法扩展使用者以处理不同类型的消息,您将不得不阻止消息到达队列首先。这是 RabbitMQ 服务器和可能的生产者的工作。

您没有提供足够的信息让我们建议如何配置 RabbitMQ 交换、队列和绑定。也许消息携带一些头信息,使 RabbitMQ 服务器能够区分不同类型的消息。如果没有这样的信息,也许可以扩展消息生产者来添加这样的头信息。

简单地拒绝 (NACK) 您的消费者无法处理的消息是个坏主意。这只会将消息放回同一个队列。如果没有其他消费者可以处理,这条消息永远不会被消费成功(ACK)。

在 RabbitMQ 路由键上使用 switch

Channel.Consume 方法 returns 类型 <-chan amqp.Delivery 的 Go 通道,其中 amqp.Delivery 包含字段 RoutingKey.

路由键是用于将发布的消息与消费者订阅相匹配的标识符。您应该确保您的发布者在路由键和消息类型之间保持一对一的关联。

发布商代码如下所示:

msg := &pb.AddRequest{} // some protobuf generated type
body, _ := proto.Marshal(msg)
err := ch.Publish(
        "my-exchange", // exchange name
        "foo.bar.add", // routing key
        true,          // option: mandatory
        true,          // option: immediate
        amqp.Publishing{
            ContentType: "application/x-protobuf",
            Body:        body,
        },
    )

在上面的示例中,您必须确保 所有且仅 类型 *pb.AddRequest 的消息使用路由密钥 foo.bar.add 发布,即您的消息类型是确定性的。

如果你能做到这一点,那么你的消费者代码就可以打开路由密钥并将 MQ 负载解组为正确类型的变量:

func formatEvent(payload amqp.Delivery) (proto.Message, error) {

    var event proto.Message

    // switch on the routing key
    switch payload.RoutingKey {
    case "foo.bar.add":
        event = &pb.AddRequest{}
    case "foo.bar.get":
        event = &pb.GetRequest{}
    default:
        return nil, fmt.Errorf("unknown routingKey: %s", key)
    }

    // unmarshal the body into the event variable
    if err := proto.Unmarshal(payload.Body, event); err != nil {
        return nil, err
    }

    return event, nil
}

然后您可以 type-switchproto.Message 实例上处理每个具体的消息类型。 (当然你也可以直接在routing key switch中处理具体的消息,这就看你想怎么组织你的代码了)