使用 RabbitMQ 和 Apache Camel 为 AMQP 消息获取错误的路由密钥
Getting the Wrong Routing Key for AMQP message using RabbitMQ and Apache Camel
我很难找到我的 Camel 路线的问题。从我一直在阅读的内容来看,这似乎与我的路由密钥 header 信息搞砸有关,但我不确定如何解决这个问题。另外,如果这很重要的话,这是一个 Java OSGi 项目,但是所有 Camel 的东西目前都在 XML 中实现。感谢任何帮助。
这是我正在尝试做的事情:
<!-- The first route creates an object with some info in it and drop it on a rabbitmq
exchange called message.added -->
<route id="directIn">
<from uri="direct:in" />
<bean ref="connector" method="handleIncoming" />
<marshal id="marshal-one" ref="firstObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
</route>
<!-- This route listens to message.added, processes the data, creates a new object, and
drops it on a different rabbitmq exchange called message.rest -->
<route id="addedOne">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-one" ref="firstObject" />
<bean ref="connector" method="processAndConvert" />
<marshal id="marshal-out" ref="secondObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.rest" />
</route>
这段代码似乎运行良好。当我们添加一个新的包时,问题就出现了,它也在监听 message.added rabbitmq 消息(我相信你可以这样做?)
<!-- This route is in a different bundle, also listening to message.added, processing
the data and creating the same object (from a common bundle) and dropping it
on the same rabbitmq exchange as before -->
<route id="addedTwo">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-two" ref="firstObject" />
<bean ref="someService" method="processUpdate" />
<marshal id="marshal-out2" ref="secondObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.rest" />
</route>
此路由失败并显示一条错误消息,指出它正在尝试将 secondObject
解组为 addedTwo
路由中的 firstObject
。
ERROR | RabbitMQConsumer | DefaultErrorHandler ....
caught: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "secondObject" (class com.pointer.dangling.FirstObject["secondObject"])
Message History:
RouteId ProcessorId Processor
[addedTwo] [addedTwo] [ ]
[addedTwo] [unmarshal-two] [unmarshal[ref:firstObject] ]
Exchange:
Headers: {
CamelRedelivered=false,
rabbitmq.DELIVERY_TAG=1,
rabbitmq.EXCHANGE_NAME=me.ex,
rabbitmq.ROUTING_KEY=message.added
}
Body: {
"secondObject": {
// a bunch of fields for secondObject
}
}
似乎 'secondObject' 在某个时候进入 'message.added' 交换并被 'addedTwo' 路由接收,该路由试图将其编组为'firstObject'。但是我没有在代码中的任何地方明确告诉它这样做 - 有什么想法吗?
在 RabbitMQ Camel 组件中,routingKey
端点选项仅适用于 Consumers (<from />
)。
生产者 必须在 <to />
之前将其路由密钥明确设置为消息 header。您的 addedOne
路线应如下所示:
<route id="addedOne">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-one" ref="firstObject" />
<bean ref="connector" method="processAndConvert" />
<marshal id="marshal-out" ref="secondObject" />
<setHeader headerName="rabbitmq.ROUTING_KEY">
<constant>message.rest</constant>
</setHeader>
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true" />
</route>
有关详细信息,请参阅 https://camel.apache.org/rabbitmq.html。
我很难找到我的 Camel 路线的问题。从我一直在阅读的内容来看,这似乎与我的路由密钥 header 信息搞砸有关,但我不确定如何解决这个问题。另外,如果这很重要的话,这是一个 Java OSGi 项目,但是所有 Camel 的东西目前都在 XML 中实现。感谢任何帮助。
这是我正在尝试做的事情:
<!-- The first route creates an object with some info in it and drop it on a rabbitmq
exchange called message.added -->
<route id="directIn">
<from uri="direct:in" />
<bean ref="connector" method="handleIncoming" />
<marshal id="marshal-one" ref="firstObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
</route>
<!-- This route listens to message.added, processes the data, creates a new object, and
drops it on a different rabbitmq exchange called message.rest -->
<route id="addedOne">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-one" ref="firstObject" />
<bean ref="connector" method="processAndConvert" />
<marshal id="marshal-out" ref="secondObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.rest" />
</route>
这段代码似乎运行良好。当我们添加一个新的包时,问题就出现了,它也在监听 message.added rabbitmq 消息(我相信你可以这样做?)
<!-- This route is in a different bundle, also listening to message.added, processing
the data and creating the same object (from a common bundle) and dropping it
on the same rabbitmq exchange as before -->
<route id="addedTwo">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-two" ref="firstObject" />
<bean ref="someService" method="processUpdate" />
<marshal id="marshal-out2" ref="secondObject" />
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.rest" />
</route>
此路由失败并显示一条错误消息,指出它正在尝试将 secondObject
解组为 addedTwo
路由中的 firstObject
。
ERROR | RabbitMQConsumer | DefaultErrorHandler ....
caught: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "secondObject" (class com.pointer.dangling.FirstObject["secondObject"])
Message History:
RouteId ProcessorId Processor
[addedTwo] [addedTwo] [ ]
[addedTwo] [unmarshal-two] [unmarshal[ref:firstObject] ]
Exchange:
Headers: {
CamelRedelivered=false,
rabbitmq.DELIVERY_TAG=1,
rabbitmq.EXCHANGE_NAME=me.ex,
rabbitmq.ROUTING_KEY=message.added
}
Body: {
"secondObject": {
// a bunch of fields for secondObject
}
}
似乎 'secondObject' 在某个时候进入 'message.added' 交换并被 'addedTwo' 路由接收,该路由试图将其编组为'firstObject'。但是我没有在代码中的任何地方明确告诉它这样做 - 有什么想法吗?
在 RabbitMQ Camel 组件中,routingKey
端点选项仅适用于 Consumers (<from />
)。
生产者 必须在 <to />
之前将其路由密钥明确设置为消息 header。您的 addedOne
路线应如下所示:
<route id="addedOne">
<from uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true&routingKey=message.added" />
<unmarshal id="unmarshal-one" ref="firstObject" />
<bean ref="connector" method="processAndConvert" />
<marshal id="marshal-out" ref="secondObject" />
<setHeader headerName="rabbitmq.ROUTING_KEY">
<constant>message.rest</constant>
</setHeader>
<to uri="rabbitmq://localhost:5672/me.ex?exchangeType=topic&durable=false&autoDelete=true" />
</route>
有关详细信息,请参阅 https://camel.apache.org/rabbitmq.html。