Spring IntegrationFlow 与 RestController
Spring IntegrationFlow with RestController
我想创建一个应用程序来执行以下步骤:
- 通过 RestController 接收请求
- 将收到的消息发送到队列 (AMQP - MessageChannel) (correlationId?)
- 在另一个队列中等待回复 (AMQP - MessageChannel) (correlationId?)
- Return 与步骤 1 中的请求在同一线程上的响应。
我考虑过为此使用 IntegrationFlow,但我无法调整这些步骤。
此外,您知道实现此流程的最佳方式是什么吗?
我尝试用下面的代码实现:
package poc.integration.http;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.inbound.HttpRequestHandlingMessagingGateway;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@EnableIntegration
public class IntegrationFlowConfig {
final Logger logger = LoggerFactory.getLogger(IntegrationFlowConfig.class);
@Bean
public HttpRequestHandlingMessagingGateway inbound() {
HttpRequestHandlingMessagingGateway gateway = new HttpRequestHandlingMessagingGateway(true);
gateway.setRequestMapping(mapping());
gateway.setRequestPayloadTypeClass(String.class);
gateway.setRequestChannelName("httpRequest");
return gateway;
}
@Bean
public RequestMapping mapping() {
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/foo");
requestMapping.setMethods(HttpMethod.GET);
return requestMapping;
}
@ServiceActivator(inputChannel = "httpResponse")
@Bean
public HttpRequestExecutingMessageHandler outbound() {
HttpRequestExecutingMessageHandler handler =
new HttpRequestExecutingMessageHandler("http://10.141.201.206:80/foo");
handler.setHttpMethod(HttpMethod.GET);
handler.setExpectedResponseType(String.class);
return handler;
}
@ServiceActivator(inputChannel="httpRequest", outputChannel="httpResponse")
public Object processarMensagem(Object mensagem) {
return mensagem + " - done";
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerAmqp(ThreadPoolTaskScheduler taskScheduler) {
final PollerMetadata poller = new PollerMetadata();
poller.setTaskExecutor(taskScheduler);
poller.setReceiveTimeout(-1);
return poller;
}
@Bean
public MessageChannel httpRequest(AmqpTemplate amqpTemplate) {
PollableAmqpChannel channel = new PollableAmqpChannel("httpRequest", amqpTemplate,
DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
channel.setExtractPayload(true);
return channel;
}
@Bean
public MessageChannel httpResponse(AmqpTemplate amqpTemplate) {
PollableAmqpChannel channel = new PollableAmqpChannel("httpResponse", amqpTemplate,
DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
channel.setExtractPayload(true);
return channel;
}
}
但我收到消息:
超时内未收到回复
您只需要一个 IntegrationFlow
和一个 Http.inboundControllerAdapter()
作为起点。它完全取代了提到的 RestController,但让您避免了从 @RestController
到 IntegrationFlow
以及返回的额外工作。
流程的下一步应该是 Amqp.outboundGateway()
通过 AMQP 发送和接收。这个为您处理相关性。
在文档中查看更多信息:
我想创建一个应用程序来执行以下步骤:
- 通过 RestController 接收请求
- 将收到的消息发送到队列 (AMQP - MessageChannel) (correlationId?)
- 在另一个队列中等待回复 (AMQP - MessageChannel) (correlationId?)
- Return 与步骤 1 中的请求在同一线程上的响应。
我考虑过为此使用 IntegrationFlow,但我无法调整这些步骤。
此外,您知道实现此流程的最佳方式是什么吗?
我尝试用下面的代码实现:
package poc.integration.http;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.inbound.HttpRequestHandlingMessagingGateway;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@EnableIntegration
public class IntegrationFlowConfig {
final Logger logger = LoggerFactory.getLogger(IntegrationFlowConfig.class);
@Bean
public HttpRequestHandlingMessagingGateway inbound() {
HttpRequestHandlingMessagingGateway gateway = new HttpRequestHandlingMessagingGateway(true);
gateway.setRequestMapping(mapping());
gateway.setRequestPayloadTypeClass(String.class);
gateway.setRequestChannelName("httpRequest");
return gateway;
}
@Bean
public RequestMapping mapping() {
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/foo");
requestMapping.setMethods(HttpMethod.GET);
return requestMapping;
}
@ServiceActivator(inputChannel = "httpResponse")
@Bean
public HttpRequestExecutingMessageHandler outbound() {
HttpRequestExecutingMessageHandler handler =
new HttpRequestExecutingMessageHandler("http://10.141.201.206:80/foo");
handler.setHttpMethod(HttpMethod.GET);
handler.setExpectedResponseType(String.class);
return handler;
}
@ServiceActivator(inputChannel="httpRequest", outputChannel="httpResponse")
public Object processarMensagem(Object mensagem) {
return mensagem + " - done";
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerAmqp(ThreadPoolTaskScheduler taskScheduler) {
final PollerMetadata poller = new PollerMetadata();
poller.setTaskExecutor(taskScheduler);
poller.setReceiveTimeout(-1);
return poller;
}
@Bean
public MessageChannel httpRequest(AmqpTemplate amqpTemplate) {
PollableAmqpChannel channel = new PollableAmqpChannel("httpRequest", amqpTemplate,
DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
channel.setExtractPayload(true);
return channel;
}
@Bean
public MessageChannel httpResponse(AmqpTemplate amqpTemplate) {
PollableAmqpChannel channel = new PollableAmqpChannel("httpResponse", amqpTemplate,
DefaultAmqpHeaderMapper.outboundMapper(), DefaultAmqpHeaderMapper.inboundMapper());
channel.setExtractPayload(true);
return channel;
}
}
但我收到消息: 超时内未收到回复
您只需要一个 IntegrationFlow
和一个 Http.inboundControllerAdapter()
作为起点。它完全取代了提到的 RestController,但让您避免了从 @RestController
到 IntegrationFlow
以及返回的额外工作。
流程的下一步应该是 Amqp.outboundGateway()
通过 AMQP 发送和接收。这个为您处理相关性。
在文档中查看更多信息: