使用 Spring 集成为 request/response 创建命名回复目标
Creating a named reply destination for request/response using Spring Integration
我有两个单独的应用程序 运行 在 ActiveMQ 代理的两边;应用程序 1 向应用程序 2 发送同步请求,returns 将响应返回给应用程序 1。目前,回复是通过临时队列进行的,我现在正在尝试创建一个命名的回复目标,以避免创建多个临时队列的开销。
申请 1
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "responseChannel")
public OrderDto fetchOrder(OrderRequest orderRequest);
}
@Bean
public IntegrationFlow outgoingRequestFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from("requestChannel")
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("request.queue")
.replyDestination("response.topic")
.correlationKey("JMSCorrelationID"))
.channel("responseChannel")
.get();
}
应用程序 2
@Bean
public IntegrationFlow incomingRequestFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
.destination("request.queue")
.correlationKey("JMSCorrelationID"))
.channel("requestChannel")
.handle("requestServiceActivator", "handleRequest")
.channel("responseChannel")
.get();
}
@Component
public class OrderServiceActivator {
@Autowired
OrderService orderService;
@ServiceActivator
public OrderDto fetchOrder(OrderRequest orderRequest) {
return orderService.getById(orderRequest.getId());
}
}
当我启动两个应用程序时,request.queue
被创建并有一个消费者(应用程序 2)。 response.topic
已创建,但由于某种原因它没有消费者。因此,当我向应用程序 1 发送请求时,它会到达应用程序 2,但 5 秒后应用程序 1 未收到回复并超时,并记录以下错误:
应用程序 2
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.web.context.WebApplicationContext:/application-2.responseChannel'
申请 1
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
我想我犯了一些简单的配置错误,我们将不胜感激。
我没找到@ServiceActivator 是如何连接的...
通常是这样的:
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "responseChannel")
public .....
也许这就是你所缺少的。
根据您的配置,回复队列没有长期使用的消费者 - 为每个请求创建一个消费者(使用特定相关 ID 的消息选择器)。
如果添加 .replyContainer()
将有一个永久消费者。
但是,它在功能上应该没有什么区别。
我只是 运行 测试与你的类似,有和没有 replyContainer()
,这对我来说都很好......
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return f -> f.handleWithAdapter(a ->
a.jmsGateway(this.jmsConnectionFactory)
// .replyContainer()
.replyDestination("pipereplies")
.correlationKey("JmsCorrelationID")
.requestDestination("jmsPipelineTest"));
}
我建议您打开调试日志记录,看看是否有帮助。
@Bean
public IntegrationFlow jmsInboundGatewayFlow() {
return IntegrationFlows.from((MessagingGateways g) ->
g.jms(this.jmsConnectionFactory)
.correlationKey("JmsCorrelationID")
.destination("jmsPipelineTest"))
.<String, String>transform(String::toUpperCase)
.get();
}
我有两个单独的应用程序 运行 在 ActiveMQ 代理的两边;应用程序 1 向应用程序 2 发送同步请求,returns 将响应返回给应用程序 1。目前,回复是通过临时队列进行的,我现在正在尝试创建一个命名的回复目标,以避免创建多个临时队列的开销。
申请 1
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "responseChannel")
public OrderDto fetchOrder(OrderRequest orderRequest);
}
@Bean
public IntegrationFlow outgoingRequestFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from("requestChannel")
.handle(Jms.outboundGateway(connectionFactory)
.requestDestination("request.queue")
.replyDestination("response.topic")
.correlationKey("JMSCorrelationID"))
.channel("responseChannel")
.get();
}
应用程序 2
@Bean
public IntegrationFlow incomingRequestFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
.destination("request.queue")
.correlationKey("JMSCorrelationID"))
.channel("requestChannel")
.handle("requestServiceActivator", "handleRequest")
.channel("responseChannel")
.get();
}
@Component
public class OrderServiceActivator {
@Autowired
OrderService orderService;
@ServiceActivator
public OrderDto fetchOrder(OrderRequest orderRequest) {
return orderService.getById(orderRequest.getId());
}
}
当我启动两个应用程序时,request.queue
被创建并有一个消费者(应用程序 2)。 response.topic
已创建,但由于某种原因它没有消费者。因此,当我向应用程序 1 发送请求时,它会到达应用程序 2,但 5 秒后应用程序 1 未收到回复并超时,并记录以下错误:
应用程序 2
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.web.context.WebApplicationContext:/application-2.responseChannel'
申请 1
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
我想我犯了一些简单的配置错误,我们将不胜感激。
我没找到@ServiceActivator 是如何连接的...
通常是这样的:
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "responseChannel")
public .....
也许这就是你所缺少的。
根据您的配置,回复队列没有长期使用的消费者 - 为每个请求创建一个消费者(使用特定相关 ID 的消息选择器)。
如果添加 .replyContainer()
将有一个永久消费者。
但是,它在功能上应该没有什么区别。
我只是 运行 测试与你的类似,有和没有 replyContainer()
,这对我来说都很好......
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return f -> f.handleWithAdapter(a ->
a.jmsGateway(this.jmsConnectionFactory)
// .replyContainer()
.replyDestination("pipereplies")
.correlationKey("JmsCorrelationID")
.requestDestination("jmsPipelineTest"));
}
我建议您打开调试日志记录,看看是否有帮助。
@Bean
public IntegrationFlow jmsInboundGatewayFlow() {
return IntegrationFlows.from((MessagingGateways g) ->
g.jms(this.jmsConnectionFactory)
.correlationKey("JmsCorrelationID")
.destination("jmsPipelineTest"))
.<String, String>transform(String::toUpperCase)
.get();
}