当响应来自使用 Spring 集成 DSL 的 rabbitMQ 回复队列时,如何实现 HTTP request/reply?
How to implement HTTP request/reply when the response comes from a rabbitMQ reply queue using Spring Integration DSL?
我正在尝试在 Spring 集成 DSL 中使用单独的 RabbitMQ 队列来实现 HTTP request/reply。它类似于 。不同之处在于我希望将响应返回给原始的 http 调用者。我可以看到测试 http post 消息成功传递到请求队列并转换(大写)到响应队列中。该消息也从响应队列中使用,但从未返回给调用者 (http://localhost:8080/Tunner)。最终调用超时并出现 500 错误。我对此很陌生,所以可能有一些我完全错过的东西。有人可以提供建议吗?代码如下:
public class TunnelApplication
{
public static void main(String[] args)
{
SpringApplication.run(TunnelApplication.class, args);
}
@Value("${outboundQueue}")
private String outboundQueue;
@Value("${inboundQueue}")
private String inboundQueue;
private ConnectionFactory rabbitConnectionFactory;
@Autowired
public TunnelApplication(ConnectionFactory factory) {
rabbitConnectionFactory = factory;
}
@Bean
public Queue targetQueue()
{
return new Queue(outboundQueue, true, false, true);
}
@Bean
public Queue requestQueue()
{
return new Queue(inboundQueue, true, false, true);
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
result.setMessageConverter(jsonMessageConverter());
result.setDefaultReceiveQueue(outboundQueue);
//result.setReplyAddress(outboundQueue);
result.setReplyTimeout(60000);
return result;
}
@Bean
public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows
.from(Http.inboundGateway("/tunnel"))
.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey(inboundQueue)
.returnChannel(amqpOutboundChannel()))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
.transform(String.class, String::toUpperCase)
.log()
.handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
.handle(Http.outboundGateway("http://localhost:8080/tunnel")
.expectedResponseType(String.class))
.log()
.bridge(null)
.channel(amqpOutboundChannel())
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
我们也尝试了以下代码(由我的同事),但我们也没有得到响应:
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${routingKey}")
private String routingKey;
@Value("${rabbitSinkChannel}")
private String rabbitSinkChannel;
@Bean
public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
return
Amqp
.channel(rabbitSinkChannel, connectionFactory)
.get();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(routingKey)
.returnChannel(rabbitSinkChannel)
)
.channel(rabbitSinkChannel) // or .handle? if so, what?
.get();
}
}
您可能误解了 Amqp.outboundGateway
上的 returnChannel
并尝试依靠您的逻辑。请熟悉发布者确认和 Returns 功能:https://docs.spring.io/spring-amqp/docs/current/reference/html/#cf-pub-conf-ret.
也不清楚什么是 replyBackToHttp
流目的,但目前它与对其他 bean 的混合引用混淆了。
您可能需要研究什么是来自 Spring AMQP 的请求-回复配置,并且您可能不会尝试使用另一个队列进行回复。虽然它仍然是可能的:见 replyAddress
属性 或 RabbitTemplate
: https://docs.spring.io/spring-amqp/docs/current/reference/html/#request-reply
以下更新有效(我还删除了 replyBackToHttp() 方法):
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//result.setDefaultReceiveQueue(outboundQueue);
rabbitTemplate.setReplyAddress(outboundQueue);
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(replyQueue());
container.setMessageListener((MessageListener) amqpTemplate());
return container;
}
对于那些感到沮丧并且只想继续前进的人来说,这是几乎完整的解决方案。
package com.scdf.poc.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${rabbitSource}")
private String rabbitSource;
@Value("${rabbitSink}")
private String rabbitSink; // Note: Has to be manually created in rabbit mq, the SCDF flows won't auto create this
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyAddress(rabbitSink);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(rabbitSink);
container.setMessageListener(rabbitTemplate);
return container;
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate) {
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
.requestPayloadType(String.class)
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(rabbitSource)
)
.get();
}
}
application.properties - 请注意,SCDF 使用流名称作为队列名称的前缀和后缀
rabbitSource=pocStream.rabbitSource.pocStream
rabbitSink=pocStream.rabbitSink.pocStream
pocStream 的 SCDF 流定义 - 这只是回显请求
rabbitSource: rabbit --queues=rabbitSource | bridge | rabbitSink: rabbit --routing-key=pocStream.rabbitSink.pocStream
我正在尝试在 Spring 集成 DSL 中使用单独的 RabbitMQ 队列来实现 HTTP request/reply。它类似于
public class TunnelApplication
{
public static void main(String[] args)
{
SpringApplication.run(TunnelApplication.class, args);
}
@Value("${outboundQueue}")
private String outboundQueue;
@Value("${inboundQueue}")
private String inboundQueue;
private ConnectionFactory rabbitConnectionFactory;
@Autowired
public TunnelApplication(ConnectionFactory factory) {
rabbitConnectionFactory = factory;
}
@Bean
public Queue targetQueue()
{
return new Queue(outboundQueue, true, false, true);
}
@Bean
public Queue requestQueue()
{
return new Queue(inboundQueue, true, false, true);
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
result.setMessageConverter(jsonMessageConverter());
result.setDefaultReceiveQueue(outboundQueue);
//result.setReplyAddress(outboundQueue);
result.setReplyTimeout(60000);
return result;
}
@Bean
public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows
.from(Http.inboundGateway("/tunnel"))
.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey(inboundQueue)
.returnChannel(amqpOutboundChannel()))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
.transform(String.class, String::toUpperCase)
.log()
.handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
.handle(Http.outboundGateway("http://localhost:8080/tunnel")
.expectedResponseType(String.class))
.log()
.bridge(null)
.channel(amqpOutboundChannel())
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
我们也尝试了以下代码(由我的同事),但我们也没有得到响应:
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${routingKey}")
private String routingKey;
@Value("${rabbitSinkChannel}")
private String rabbitSinkChannel;
@Bean
public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
return
Amqp
.channel(rabbitSinkChannel, connectionFactory)
.get();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(routingKey)
.returnChannel(rabbitSinkChannel)
)
.channel(rabbitSinkChannel) // or .handle? if so, what?
.get();
}
}
您可能误解了 Amqp.outboundGateway
上的 returnChannel
并尝试依靠您的逻辑。请熟悉发布者确认和 Returns 功能:https://docs.spring.io/spring-amqp/docs/current/reference/html/#cf-pub-conf-ret.
也不清楚什么是 replyBackToHttp
流目的,但目前它与对其他 bean 的混合引用混淆了。
您可能需要研究什么是来自 Spring AMQP 的请求-回复配置,并且您可能不会尝试使用另一个队列进行回复。虽然它仍然是可能的:见 replyAddress
属性 或 RabbitTemplate
: https://docs.spring.io/spring-amqp/docs/current/reference/html/#request-reply
以下更新有效(我还删除了 replyBackToHttp() 方法):
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//result.setDefaultReceiveQueue(outboundQueue);
rabbitTemplate.setReplyAddress(outboundQueue);
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(replyQueue());
container.setMessageListener((MessageListener) amqpTemplate());
return container;
}
对于那些感到沮丧并且只想继续前进的人来说,这是几乎完整的解决方案。
package com.scdf.poc.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${rabbitSource}")
private String rabbitSource;
@Value("${rabbitSink}")
private String rabbitSink; // Note: Has to be manually created in rabbit mq, the SCDF flows won't auto create this
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyAddress(rabbitSink);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(rabbitSink);
container.setMessageListener(rabbitTemplate);
return container;
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate) {
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
.requestPayloadType(String.class)
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(rabbitSource)
)
.get();
}
}
application.properties - 请注意,SCDF 使用流名称作为队列名称的前缀和后缀
rabbitSource=pocStream.rabbitSource.pocStream
rabbitSink=pocStream.rabbitSink.pocStream
pocStream 的 SCDF 流定义 - 这只是回显请求
rabbitSource: rabbit --queues=rabbitSource | bridge | rabbitSink: rabbit --routing-key=pocStream.rabbitSink.pocStream