使用 RabbitMQ 和 Apache Camel 为 AMQP 消息获取错误的路由密钥

Getting the Wrong Routing Key for AMQP message using RabbitMQ and Apache Camel

我很难找到我的 Camel 路线的问题。从我一直在阅读的内容来看,这似乎与我的路由密钥 header 信息搞砸有关,但我不确定如何解决这个问题。另外,如果这很重要的话,这是一个 Java OSGi 项目,但是所有 Camel 的东西目前都在 XML 中实现。感谢任何帮助。

这是我正在尝试做的事情:

<!-- The first route creates an object with some info in it and drop it on a rabbitmq 
exchange called message.added -->
<route id="directIn">
    <from uri="direct:in" />
    <bean ref="connector" method="handleIncoming" />
    <marshal id="marshal-one" ref="firstObject" />
    <to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.added" />
</route>

<!-- This route listens to message.added, processes the data, creates a new object, and 
drops it on a different rabbitmq exchange called message.rest -->
<route id="addedOne">
    <from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.added" />
    <unmarshal id="unmarshal-one" ref="firstObject" />
    <bean ref="connector" method="processAndConvert" />
    <marshal id="marshal-out" ref="secondObject" />
    <to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.rest" />
</route>

这段代码似乎运行良好。当我们添加一个新的包时,问题就出现了,它也在监听 message.added rabbitmq 消息(我相信你可以这样做?)

<!-- This route is in a different bundle, also listening to message.added, processing
     the data and creating the same object (from a common bundle) and dropping it
     on the same rabbitmq exchange as before -->
<route id="addedTwo">
    <from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.added" />
    <unmarshal id="unmarshal-two" ref="firstObject" />
    <bean ref="someService" method="processUpdate" />
    <marshal id="marshal-out2" ref="secondObject" />
    <to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.rest" />
</route>

此路由失败并显示一条错误消息,指出它正在尝试将 secondObject 解组为 addedTwo 路由中的 firstObject

ERROR | RabbitMQConsumer | DefaultErrorHandler ....
caught: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "secondObject" (class com.pointer.dangling.FirstObject["secondObject"])

Message History:
RouteId        ProcessorId        Processor
[addedTwo]     [addedTwo]         [                             ]
[addedTwo]     [unmarshal-two]    [unmarshal[ref:firstObject]   ]

Exchange:
Headers: {
    CamelRedelivered=false,
    rabbitmq.DELIVERY_TAG=1,
    rabbitmq.EXCHANGE_NAME=me.ex,
    rabbitmq.ROUTING_KEY=message.added
}
Body: {
    "secondObject": {
        // a bunch of fields for secondObject
    }
}

似乎 'secondObject' 在某个时候进入 'message.added' 交换并被 'addedTwo' 路由接收,该路由试图将其编组为'firstObject'。但是我没有在代码中的任何地方明确告诉它这样做 - 有什么想法吗?

在 RabbitMQ Camel 组件中,routingKey 端点选项仅适用于 Consumers (<from />)。

生产者 必须在 <to /> 之前将其路由密钥明确设置为消息 header。您的 addedOne 路线应如下所示:

<route id="addedOne">
    <from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true&amp;routingKey=message.added" />
    <unmarshal id="unmarshal-one" ref="firstObject" />
    <bean ref="connector" method="processAndConvert" />
    <marshal id="marshal-out" ref="secondObject" />
    <setHeader headerName="rabbitmq.ROUTING_KEY">
        <constant>message.rest</constant>
    </setHeader>
    <to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&amp;durable=false&amp;autoDelete=true" />
</route>

有关详细信息,请参阅 https://camel.apache.org/rabbitmq.html