ActiveMQ AMQP 与 JMS 转换器利用 spring 集成
ActiveMQ AMQP with JMS transformer leveraging spring Integration
我正在尝试获得一个准系统应用程序,并且 运行 利用 ActiveMQ 的 AMQP 和 JMS 转换器。我的客户端库是 Spring 集成,但是,我无法在此配置中获取基本示例和 运行。
关于 AMQP 上 ActiveMQ 的 JMS 转换器的详细信息:http://activemq.apache.org/amqp.html
主要测试应用程序
@IntegrationComponentScan
@SpringBootApplication
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public ConnectionFactory connectionFactoryAMQP() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:5672");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args);
}
@Autowired
JmsGateway gateway;
@Override
public void run(String... strings) throws Exception {
gateway.sendMessage("Hi");
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1, TimeUnit.SECONDS).get();
}
@Bean(name = "outboundChannel")
MessageChannel myOutBoundChannel() {
return new QueueChannel();
}
@Bean(name = "inboundChannel")
MessageChannel myInboundChannel() {
return new QueueChannel();
}
@Bean(name = "errorChannel")
MessageChannel myErrorChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow jmsInboundFlow() {
return IntegrationFlows.from(Jms
.inboundGateway(connectionFactoryAMQP())
.destination("myCoolQueue")
.errorChannel(myErrorChannel()))
.handle(this::print)
.get();
}
@Bean
IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from(myOutBoundChannel())
.handle(Jms.outboundAdapter(connectionFactory())
.destination("myCoolQueue"))
.get();
}
@Bean
IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(myErrorChannel())
.handle(this::printStackTrace)
.get();
}
private void print(Message message) {
System.out.println("Message payload: " + message.getPayload());
//throw new RuntimeException("broke it");
}
private void printStackTrace(Message errorMessage) {
((ErrorMessage)errorMessage).getPayload().printStackTrace();
}
}
消息网关
@MessagingGateway
interface JmsGateway {
@Gateway(requestChannel = "outboundChannel")
void sendMessage(String message);
}
ActiveMQ.xml
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.transformer=jms"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
日志输出
2017-01-09 08:42:26.158 INFO 24332 --- [ restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041)
2017-01-09 08:42:31.143 WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
ActiveMQ 客户端仅使用 ActiveMQ 本机协议 OpenWire,因此尝试将其连接到 AMQP 端口将不起作用,连接尝试将失败。您需要使用 AMQP 客户端连接到代理上的 AMQP 端口以通过 AMQP 发送和接收消息。 Apache Qpid project has a number of AMQP v1.0 client to choose from. If you want to stick to JMS type client APIs then the Qpid JMS 客户端适合您。
您必须通过两种方式更改 Bean 定义:
JNDI:
@Bean
public ConnectionFactory connectionFactoryAMQP() {
String factoryName = "myFactoryLookup";
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
props.setProperty("connectionfactory." + factoryName, "amqp://localhost:5672");
props.put("property.connectionfactory." + factoryName + ".username", "admin");
props.put("property.connectionfactory." + factoryName + ".password", "admin");
InitialContext ic = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(factoryName );
return connectionFactory;
}
或
工厂:
@Bean
public ConnectionFactory connectionFactoryAMQP() {
org.apache.qpid.jms.JmsConnectionFactory connectionFactory = new JmsConnectionFactory();
connectionFactory.setRemoteURI("amqp://localhost:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
添加这个依赖
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.9.0</version>
</dependency>
在activemq.xml
中添加端口
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>
transport.transformer=jms 只转换 JMS 消息 from/to 当代理收到 AMQP 消息时,代理端在 AMQP 传输和 ActiveMQ 之间的 AMQP 消息通过 AMQP 传输,它从 AMQP 消息转换为 JMS 消息,当消息通过 AMQP 传输发送给消费者时,它从 JMS 转换为 AMQP 消息。
我正在尝试获得一个准系统应用程序,并且 运行 利用 ActiveMQ 的 AMQP 和 JMS 转换器。我的客户端库是 Spring 集成,但是,我无法在此配置中获取基本示例和 运行。
关于 AMQP 上 ActiveMQ 的 JMS 转换器的详细信息:http://activemq.apache.org/amqp.html
主要测试应用程序
@IntegrationComponentScan
@SpringBootApplication
public class SpringCloudStreamJmsActivemqSenderExampleApplication implements CommandLineRunner {
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public ConnectionFactory connectionFactoryAMQP() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:5672");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamJmsActivemqSenderExampleApplication.class, args);
}
@Autowired
JmsGateway gateway;
@Override
public void run(String... strings) throws Exception {
gateway.sendMessage("Hi");
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1, TimeUnit.SECONDS).get();
}
@Bean(name = "outboundChannel")
MessageChannel myOutBoundChannel() {
return new QueueChannel();
}
@Bean(name = "inboundChannel")
MessageChannel myInboundChannel() {
return new QueueChannel();
}
@Bean(name = "errorChannel")
MessageChannel myErrorChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow jmsInboundFlow() {
return IntegrationFlows.from(Jms
.inboundGateway(connectionFactoryAMQP())
.destination("myCoolQueue")
.errorChannel(myErrorChannel()))
.handle(this::print)
.get();
}
@Bean
IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from(myOutBoundChannel())
.handle(Jms.outboundAdapter(connectionFactory())
.destination("myCoolQueue"))
.get();
}
@Bean
IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(myErrorChannel())
.handle(this::printStackTrace)
.get();
}
private void print(Message message) {
System.out.println("Message payload: " + message.getPayload());
//throw new RuntimeException("broke it");
}
private void printStackTrace(Message errorMessage) {
((ErrorMessage)errorMessage).getPayload().printStackTrace();
}
}
消息网关
@MessagingGateway
interface JmsGateway {
@Gateway(requestChannel = "outboundChannel")
void sendMessage(String message);
}
ActiveMQ.xml
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.transformer=jms"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:0?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
日志输出
2017-01-09 08:42:26.158 INFO 24332 --- [ restartedMain] treamJmsActivemqSenderExampleApplication : Started SpringCloudStreamJmsActivemqSenderExampleApplication in 2.676 seconds (JVM running for 3.041)
2017-01-09 08:42:31.143 WARN 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'myCoolQueue' - trying to recover. Cause: Disposed due to prior exception
2017-01-09 08:42:31.150 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:36.155 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
2017-01-09 08:42:41.163 ERROR 24332 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer : Could not refresh JMS Connection for destination 'myCoolQueue' - retrying using FixedBackOff{interval=5000, currentAttempts=2, maxAttempts=unlimited}. Cause: Cannot send, channel has already failed: tcp://127.0.0.1:5672
ActiveMQ 客户端仅使用 ActiveMQ 本机协议 OpenWire,因此尝试将其连接到 AMQP 端口将不起作用,连接尝试将失败。您需要使用 AMQP 客户端连接到代理上的 AMQP 端口以通过 AMQP 发送和接收消息。 Apache Qpid project has a number of AMQP v1.0 client to choose from. If you want to stick to JMS type client APIs then the Qpid JMS 客户端适合您。
您必须通过两种方式更改 Bean 定义:
JNDI:
@Bean
public ConnectionFactory connectionFactoryAMQP() {
String factoryName = "myFactoryLookup";
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
props.setProperty("connectionfactory." + factoryName, "amqp://localhost:5672");
props.put("property.connectionfactory." + factoryName + ".username", "admin");
props.put("property.connectionfactory." + factoryName + ".password", "admin");
InitialContext ic = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(factoryName );
return connectionFactory;
}
或
工厂:
@Bean
public ConnectionFactory connectionFactoryAMQP() {
org.apache.qpid.jms.JmsConnectionFactory connectionFactory = new JmsConnectionFactory();
connectionFactory.setRemoteURI("amqp://localhost:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
添加这个依赖
<dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.9.0</version> </dependency>
在activemq.xml
中添加端口 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>
transport.transformer=jms 只转换 JMS 消息 from/to 当代理收到 AMQP 消息时,代理端在 AMQP 传输和 ActiveMQ 之间的 AMQP 消息通过 AMQP 传输,它从 AMQP 消息转换为 JMS 消息,当消息通过 AMQP 传输发送给消费者时,它从 JMS 转换为 AMQP 消息。