Apache Camel - 消息中的特殊字符 body
Apache Camel - Special characters in message body
我正在使用 Camel 版本 3.1 并尝试使用 AMQP Component 和 Spring 引导将消息从一个 ActiveMQ 服务器发送到另一个。发送消息后,在目标 ActiveMQ 控制台,消息详细信息包含以下内容。
Sp�ASr�)�x-opt-jms-destQ�x-opt-jms-msg-typeQSs�^
�/ID:53e1ce3a-4drf-4f8a-9ff9-845fe0d0006e:3:1:1-1@�queue://testcamelwithamqp@@@@@@�qd�Sw�1:"test message"
我的实际消息是 1:"test message"
,但 JMS headers 以某种方式作为特殊字符放入消息 body 中。有什么帮助解决吗?
下面是spring引导代码示例
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableAutoConfiguration
@SpringBootApplication
public class CamelMQApplication {
public static void main(String[] args) {
SpringApplication.run(CamelMQApplication.class, args);
}
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfigInternal {
@Value("${INTERNAL_SERVICE_USERNAME}")
private String userName;
@Value("${INTERNAL_SERVICE_PASSWORD}")
private String pass;
@Value("${INTERNAL_REMOTE_URI}")
private String remoteUri;
@Autowired
private CamelContext camelInternal;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
public String getRemoteUri() {
return remoteUri;
}
public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}
private JmsConnectionFactory internalConnectionFactory() throws Exception {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setUsername(userName);
jmsConnectionFactory.setPassword(pass);
return jmsConnectionFactory;
}
@Bean
public AMQPComponent internalAmqpConnection() throws Exception {
AMQPComponent amqp = new AMQPComponent();
amqp.setConnectionFactory(internalConnectionFactory());
camelInternal.addComponent("amqpInternal", amqp);
return amqp;
}
}
import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class SampleAutowiredAmqpRouteTest extends RouteBuilder {
@Override
public void configure() throws Exception {
from("{{route1.from}}")
.convertBodyTo(String.class, "UTF-8")
.removeHeaders("*")
.log("From ActiveMQ: ${body}")
.to("{{route1.to}}")
.convertBodyTo(String.class, "UTF-8")
.removeHeaders("*");
}
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfigRouteTest {
@Value("${ACTIVEMQ_SERVICE_USERNAME}")
private String userName;
@Value("${ACTIVEMQ_SERVICE_PASSWORD}")
private String pass;
@Value("${ACTIVEMQ_REMOTE_URI}")
private String remoteUri;
@Autowired private CamelContext camelRouteTest;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
public String getRemoteUri() {
return remoteUri;
}
public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}
private JmsConnectionFactory amqp1ConnectionFactory() throws Exception {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setUsername(userName);
jmsConnectionFactory.setPassword(pass);
return jmsConnectionFactory;
}
@Bean
public AMQPComponent amqp1Connection() throws Exception {
AMQPComponent amqp = new AMQPComponent();
amqp.setConnectionFactory(amqp1ConnectionFactory());
camelRouteTest.addComponent("amqpRouteTest", amqp);
return amqp;
}
}
application.properties
server.port=8071
camel.springboot.name = CamelTest
camel.springboot.main-run-controller = true
INTERNAL_REMOTE_URI=amqp://actimqserver1:12345
INTERNAL_SERVICE_USERNAME=admin
INTERNAL_SERVICE_PASSWORD=admin
ACTIVEMQ_REMOTE_URI=amqp://actimqserver2:12345
ACTIVEMQ_SERVICE_USERNAME=admin
ACTIVEMQ_SERVICE_PASSWORD=admin
route1.from = amqpInternal:test4camelamqpSrcQ
route1.to = amqpRouteTest:test4camelamqpTgtQ
您正在使用 ActiveMQ 和 AMQP 协议。您是否也 configured the broker 支持此协议?
例如传输连接器:
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
老实说我不知道当您使用普通的 ActiveMQ 代理(默认协议是 OpenWire)并向其发送 AMQP 消息时会发生什么。
在您的情况下,Camel 似乎能够正确地生成和使用消息 to/from 队列。这并不奇怪,因为生产者和消费者都配置了 AMQP。经纪人只是存储消息。
另一方面,您写的是 Springs JmsListener 在消费时弄乱了消息。可能是因为它 不期望 AMQP 消息 而您尝试接收 JMS 消息(TextMessage、ByteMessage 等)。
您可以尝试简单地接收 Message<?>
,这是对多个传输 .
的 Spring 抽象
ActiveMQ 控制台中的消息乱七八糟并不奇怪。我猜想AMQP消息有特定的格式,控制台无法正确显示它,因为它与JMS消息不同。
解决方法:我们需要在ActiveMQ配置文件中添加transport.transformer="jms"
如下图,然后重启Active MQ
<transportConnector name="amqp" uri="amqp://localhost:5672?**transport.transformer=jms**"/>
这实际上会让 ActiveMQ 正确映射传入的 JMS 消息 headers。 'transport.transformer=native'
的默认值将 AMQP 消息包装到 JMS BytesMessage 中,因此它既不会在控制台中正确显示,也不会在客户端实际使用消息时显示。
我正在使用 Camel 版本 3.1 并尝试使用 AMQP Component 和 Spring 引导将消息从一个 ActiveMQ 服务器发送到另一个。发送消息后,在目标 ActiveMQ 控制台,消息详细信息包含以下内容。
Sp�ASr�)�x-opt-jms-destQ�x-opt-jms-msg-typeQSs�^
�/ID:53e1ce3a-4drf-4f8a-9ff9-845fe0d0006e:3:1:1-1@�queue://testcamelwithamqp@@@@@@�qd�Sw�1:"test message"
我的实际消息是 1:"test message"
,但 JMS headers 以某种方式作为特殊字符放入消息 body 中。有什么帮助解决吗?
下面是spring引导代码示例
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableAutoConfiguration
@SpringBootApplication
public class CamelMQApplication {
public static void main(String[] args) {
SpringApplication.run(CamelMQApplication.class, args);
}
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfigInternal {
@Value("${INTERNAL_SERVICE_USERNAME}")
private String userName;
@Value("${INTERNAL_SERVICE_PASSWORD}")
private String pass;
@Value("${INTERNAL_REMOTE_URI}")
private String remoteUri;
@Autowired
private CamelContext camelInternal;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
public String getRemoteUri() {
return remoteUri;
}
public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}
private JmsConnectionFactory internalConnectionFactory() throws Exception {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setUsername(userName);
jmsConnectionFactory.setPassword(pass);
return jmsConnectionFactory;
}
@Bean
public AMQPComponent internalAmqpConnection() throws Exception {
AMQPComponent amqp = new AMQPComponent();
amqp.setConnectionFactory(internalConnectionFactory());
camelInternal.addComponent("amqpInternal", amqp);
return amqp;
}
}
import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class SampleAutowiredAmqpRouteTest extends RouteBuilder {
@Override
public void configure() throws Exception {
from("{{route1.from}}")
.convertBodyTo(String.class, "UTF-8")
.removeHeaders("*")
.log("From ActiveMQ: ${body}")
.to("{{route1.to}}")
.convertBodyTo(String.class, "UTF-8")
.removeHeaders("*");
}
}
import org.apache.camel.CamelContext;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfigRouteTest {
@Value("${ACTIVEMQ_SERVICE_USERNAME}")
private String userName;
@Value("${ACTIVEMQ_SERVICE_PASSWORD}")
private String pass;
@Value("${ACTIVEMQ_REMOTE_URI}")
private String remoteUri;
@Autowired private CamelContext camelRouteTest;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
public String getRemoteUri() {
return remoteUri;
}
public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}
private JmsConnectionFactory amqp1ConnectionFactory() throws Exception {
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setUsername(userName);
jmsConnectionFactory.setPassword(pass);
return jmsConnectionFactory;
}
@Bean
public AMQPComponent amqp1Connection() throws Exception {
AMQPComponent amqp = new AMQPComponent();
amqp.setConnectionFactory(amqp1ConnectionFactory());
camelRouteTest.addComponent("amqpRouteTest", amqp);
return amqp;
}
}
application.properties
server.port=8071
camel.springboot.name = CamelTest
camel.springboot.main-run-controller = true
INTERNAL_REMOTE_URI=amqp://actimqserver1:12345
INTERNAL_SERVICE_USERNAME=admin
INTERNAL_SERVICE_PASSWORD=admin
ACTIVEMQ_REMOTE_URI=amqp://actimqserver2:12345
ACTIVEMQ_SERVICE_USERNAME=admin
ACTIVEMQ_SERVICE_PASSWORD=admin
route1.from = amqpInternal:test4camelamqpSrcQ
route1.to = amqpRouteTest:test4camelamqpTgtQ
您正在使用 ActiveMQ 和 AMQP 协议。您是否也 configured the broker 支持此协议?
例如传输连接器:
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/>
老实说我不知道当您使用普通的 ActiveMQ 代理(默认协议是 OpenWire)并向其发送 AMQP 消息时会发生什么。
在您的情况下,Camel 似乎能够正确地生成和使用消息 to/from 队列。这并不奇怪,因为生产者和消费者都配置了 AMQP。经纪人只是存储消息。
另一方面,您写的是 Springs JmsListener 在消费时弄乱了消息。可能是因为它 不期望 AMQP 消息 而您尝试接收 JMS 消息(TextMessage、ByteMessage 等)。
您可以尝试简单地接收 Message<?>
,这是对多个传输 .
ActiveMQ 控制台中的消息乱七八糟并不奇怪。我猜想AMQP消息有特定的格式,控制台无法正确显示它,因为它与JMS消息不同。
解决方法:我们需要在ActiveMQ配置文件中添加transport.transformer="jms"
如下图,然后重启Active MQ
<transportConnector name="amqp" uri="amqp://localhost:5672?**transport.transformer=jms**"/>
这实际上会让 ActiveMQ 正确映射传入的 JMS 消息 headers。 'transport.transformer=native'
的默认值将 AMQP 消息包装到 JMS BytesMessage 中,因此它既不会在控制台中正确显示,也不会在客户端实际使用消息时显示。