如何在同一应用程序中更好地关联 Spring 集成 TCP 入站和出站适配器?
How to better correlate Spring Integration TCP Inbound and Outbound Adapters within the same application?
我目前有一个 Spring 集成应用程序,它利用许多 TCP 入站和出站适配器组合来处理消息。所有这些适配器组合都使用相同的单个 MessageEndpoint 进行请求处理,并使用相同的 MessagingGateway
进行响应发送。
MessageEndpoint 的最终输出通道是一个 DirectChannel
,它也是 MessageGateway 的 DefaultRequestChannel。此 DirectChannel
使用默认值 RoundRobinLoadBalancingStrategy
进行循环搜索以寻找正确的出站适配器以发送给定的响应。当然,这种循环搜索并不总能在第一次搜索时找到合适的出站适配器,如果没有找到相应的日志。这不仅会产生大量不需要的日志记录,还会引发一些性能问题,因为我预计在任何给定时间都存在数百个 inbound/outbound 适配器组合。
我想知道是否有一种方法可以使入站和出站适配器更紧密地关联起来,从而不需要循环处理并且每个响应都可以直接发送到相应的出站适配器?理想情况下,我希望以一种可以保持使用单个 MessageEndpoint
和单个 MessageGateway
的方式来实现。
注意: 请将解决方案限制为使用 Inbound/Outbound 适配器组合的解决方案。我的实现无法使用 TcpInbound/TcpOutboundGateways,因为我需要向单个请求发送多个响应,据我所知,这只能通过使用 inbound/outbound 适配器来完成。
为了清楚起见,下面是描述的当前实施的精简版。我试图清除所有不相关的代码只是为了让事情更容易阅读...
// Inbound/Outbound Adapter creation (part of a service that is used to dynamically create varying number of inbound/outbound adapter combinations)
public void configureAdapterCombination(int port) {
TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(port);
// Connection Factory registered with Application Context bean factory (removed for readability)...
TcpReceivingChannelAdapter inboundAdapter = new TcpReceivingChannelAdapter();
inboundAdapter.setConnectionFactory(connectionFactory);
inboundAdapter.setOutputChannel(context.getBean("sendFirstResponse", DirectChannel.class));
// Inbound Adapter registered with Application Context bean factory (removed for readability)...
TcpSendingMessageHandler outboundAdapter = new TcpSendingMessageHandler();
outboundAdapter.setConnectionFactory(connectionFactory);
// Outbound Adapter registered with Application Context bean factory (removed for readability)...
context.getBean("outboundResponse", DirectChannel.class).subscribe(outboundAdapter);
}
// Message Endpoint for processing requests
@MessageEndpoint
public class RequestProcessor {
@Autowired
private OutboundResponseGateway outboundResponseGateway;
// Direct Channel which is using Round Robin lookup
@Bean
public DirectChannel outboundResponse() {
return new DirectChannel();
}
// Removed additional, unrelated, endpoints for readability...
@ServiceActivator(inputChannel="sendFirstResponse", outputChannel="sendSecondResponse")
public Message<String> sendFirstResponse(Message<String> message) {
// Unrelated message processing/response generation excluded...
outboundResponseGateway.sendOutboundResponse("First Response", message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
return message;
}
// Service Activator that puts second response on the request channel of the Message Gateway
@ServiceActivator(inputChannel = "sendSecondResponse", outputChannel="outboundResponse")
public Message<String> processQuery(Message<String> message) {
// Unrelated message processing/response generation excluded...
return MessageBuilder.withPayload("Second Response").copyHeaders(message.getHeaders()).build();
}
}
// Messaging Gateway for sending responses
@MessagingGateway(defaultRequestChannel="outboundResponse")
public interface OutboundResponseGateway {
public void sendOutboundResponse(@Payload String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
解决方案:
@Artem 在下面 comments/answers 中的建议似乎可以解决问题。只是想快速说明一下我是如何在创建时向每个出站适配器添加 replyChannel
的。
我所做的是创建两个由应用程序维护的地图。每当创建新的 Inbound/Outbound 适配器组合时都会填充第一个映射,它是 ConnectionFactory
名称到 replyChannel
名称的映射。第二个映射是 ConnectionId
到 replyChannel
名称的映射,它通过 EventListener
填充在任何新的 TcpConnectionOpenEvent
上。
注意每一个TcpConnectionOpenEvent
都会有一个ConnectionFactoryName
和ConnectionId
属性根据where/how定义的连接建立。
从那里,每当收到新请求时,我都会使用这些地图和 Message
上的 'ip_connectionId' header 添加 replyChannel
header到消息。第一个响应是通过从应用程序的上下文中手动获取相应的 replyChannel
(基于 replyChannel
header 的值)并在该通道上发送响应来发送的。第二个响应是通过 Spring 集成使用 replyChannel
header 在消息上发送的,正如 Artem 在他的响应中所描述的那样。
此解决方案是作为概念的快速证明而实施的,并且仅适用于我当前的实施。包括这一点,希望能快速启动其他观众自己的 implementations/solutions.
好吧,我现在明白你关于 round-robin
的观点了。您针对相同的通道创建许多相似的 TCP 通道适配器。在这种情况下,确实很难将一个流与另一个流区分开来,因为您对这些频道及其订阅者有一点控制权。
其中一个解决方案将与 Spring 集成 Java DSL 及其动态流程相得益彰:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-runtime-flows
因此,您将只专注于流,而不必担心运行时注册。但是由于您不在那里并且您只处理普通的 Java 和注释配置,因此您要实现目标要困难得多。但还是...
您可能知道 replyChannel
header 之类的东西。当我们没有配置 outputChannel
时,它会被考虑在内。这样,您就可以为每个流拥有一个独立的通道,并且所有流的配置都将完全相同。
所以,
- 我会为每个
configureAdapterCombination()
电话创建一个新频道。
- 将这个传播到
replyChannel.subscribe(outboundAdapter);
的那个方法
- 在您的特定流程的开头使用此渠道将其填充到
replyChannel
header.
这样你的 processQuery()
service-activator 应该没有 outputChannel
。它将从 replyChannel
header 中选择,以获得适当的出站通道适配器关联。
对于这种情况,您不需要 @MessagingGateway
,因为我们不再有固定的 defaultRequestChannel
。在 sendFirstResponse()
服务方法中,您只需使用 replyChannel
header 并手动发送新创建的消息。从技术上讲,这与您尝试使用提到的 @MessagingGateway
.
所做的完全相同
对于 Java DSL 变体,我会在 PublishSubscribeChannel
上使用 filter
来丢弃那些不属于当前流的消息。无论如何,这是一个不同的故事。
尝试找出在配置特定 configureAdapterCombination()
.
时如何让每个流都有一个回复通道
我目前有一个 Spring 集成应用程序,它利用许多 TCP 入站和出站适配器组合来处理消息。所有这些适配器组合都使用相同的单个 MessageEndpoint 进行请求处理,并使用相同的 MessagingGateway
进行响应发送。
MessageEndpoint 的最终输出通道是一个 DirectChannel
,它也是 MessageGateway 的 DefaultRequestChannel。此 DirectChannel
使用默认值 RoundRobinLoadBalancingStrategy
进行循环搜索以寻找正确的出站适配器以发送给定的响应。当然,这种循环搜索并不总能在第一次搜索时找到合适的出站适配器,如果没有找到相应的日志。这不仅会产生大量不需要的日志记录,还会引发一些性能问题,因为我预计在任何给定时间都存在数百个 inbound/outbound 适配器组合。
我想知道是否有一种方法可以使入站和出站适配器更紧密地关联起来,从而不需要循环处理并且每个响应都可以直接发送到相应的出站适配器?理想情况下,我希望以一种可以保持使用单个 MessageEndpoint
和单个 MessageGateway
的方式来实现。
注意: 请将解决方案限制为使用 Inbound/Outbound 适配器组合的解决方案。我的实现无法使用 TcpInbound/TcpOutboundGateways,因为我需要向单个请求发送多个响应,据我所知,这只能通过使用 inbound/outbound 适配器来完成。
为了清楚起见,下面是描述的当前实施的精简版。我试图清除所有不相关的代码只是为了让事情更容易阅读...
// Inbound/Outbound Adapter creation (part of a service that is used to dynamically create varying number of inbound/outbound adapter combinations)
public void configureAdapterCombination(int port) {
TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(port);
// Connection Factory registered with Application Context bean factory (removed for readability)...
TcpReceivingChannelAdapter inboundAdapter = new TcpReceivingChannelAdapter();
inboundAdapter.setConnectionFactory(connectionFactory);
inboundAdapter.setOutputChannel(context.getBean("sendFirstResponse", DirectChannel.class));
// Inbound Adapter registered with Application Context bean factory (removed for readability)...
TcpSendingMessageHandler outboundAdapter = new TcpSendingMessageHandler();
outboundAdapter.setConnectionFactory(connectionFactory);
// Outbound Adapter registered with Application Context bean factory (removed for readability)...
context.getBean("outboundResponse", DirectChannel.class).subscribe(outboundAdapter);
}
// Message Endpoint for processing requests
@MessageEndpoint
public class RequestProcessor {
@Autowired
private OutboundResponseGateway outboundResponseGateway;
// Direct Channel which is using Round Robin lookup
@Bean
public DirectChannel outboundResponse() {
return new DirectChannel();
}
// Removed additional, unrelated, endpoints for readability...
@ServiceActivator(inputChannel="sendFirstResponse", outputChannel="sendSecondResponse")
public Message<String> sendFirstResponse(Message<String> message) {
// Unrelated message processing/response generation excluded...
outboundResponseGateway.sendOutboundResponse("First Response", message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
return message;
}
// Service Activator that puts second response on the request channel of the Message Gateway
@ServiceActivator(inputChannel = "sendSecondResponse", outputChannel="outboundResponse")
public Message<String> processQuery(Message<String> message) {
// Unrelated message processing/response generation excluded...
return MessageBuilder.withPayload("Second Response").copyHeaders(message.getHeaders()).build();
}
}
// Messaging Gateway for sending responses
@MessagingGateway(defaultRequestChannel="outboundResponse")
public interface OutboundResponseGateway {
public void sendOutboundResponse(@Payload String payload, @Header(IpHeaders.CONNECTION_ID) String connectionId);
}
解决方案:
@Artem 在下面 comments/answers 中的建议似乎可以解决问题。只是想快速说明一下我是如何在创建时向每个出站适配器添加 replyChannel
的。
我所做的是创建两个由应用程序维护的地图。每当创建新的 Inbound/Outbound 适配器组合时都会填充第一个映射,它是 ConnectionFactory
名称到 replyChannel
名称的映射。第二个映射是 ConnectionId
到 replyChannel
名称的映射,它通过 EventListener
填充在任何新的 TcpConnectionOpenEvent
上。
注意每一个TcpConnectionOpenEvent
都会有一个ConnectionFactoryName
和ConnectionId
属性根据where/how定义的连接建立。
从那里,每当收到新请求时,我都会使用这些地图和 Message
上的 'ip_connectionId' header 添加 replyChannel
header到消息。第一个响应是通过从应用程序的上下文中手动获取相应的 replyChannel
(基于 replyChannel
header 的值)并在该通道上发送响应来发送的。第二个响应是通过 Spring 集成使用 replyChannel
header 在消息上发送的,正如 Artem 在他的响应中所描述的那样。
此解决方案是作为概念的快速证明而实施的,并且仅适用于我当前的实施。包括这一点,希望能快速启动其他观众自己的 implementations/solutions.
好吧,我现在明白你关于 round-robin
的观点了。您针对相同的通道创建许多相似的 TCP 通道适配器。在这种情况下,确实很难将一个流与另一个流区分开来,因为您对这些频道及其订阅者有一点控制权。
其中一个解决方案将与 Spring 集成 Java DSL 及其动态流程相得益彰:https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-runtime-flows
因此,您将只专注于流,而不必担心运行时注册。但是由于您不在那里并且您只处理普通的 Java 和注释配置,因此您要实现目标要困难得多。但还是...
您可能知道 replyChannel
header 之类的东西。当我们没有配置 outputChannel
时,它会被考虑在内。这样,您就可以为每个流拥有一个独立的通道,并且所有流的配置都将完全相同。
所以,
- 我会为每个
configureAdapterCombination()
电话创建一个新频道。 - 将这个传播到
replyChannel.subscribe(outboundAdapter);
的那个方法
- 在您的特定流程的开头使用此渠道将其填充到
replyChannel
header.
这样你的 processQuery()
service-activator 应该没有 outputChannel
。它将从 replyChannel
header 中选择,以获得适当的出站通道适配器关联。
对于这种情况,您不需要 @MessagingGateway
,因为我们不再有固定的 defaultRequestChannel
。在 sendFirstResponse()
服务方法中,您只需使用 replyChannel
header 并手动发送新创建的消息。从技术上讲,这与您尝试使用提到的 @MessagingGateway
.
对于 Java DSL 变体,我会在 PublishSubscribeChannel
上使用 filter
来丢弃那些不属于当前流的消息。无论如何,这是一个不同的故事。
尝试找出在配置特定 configureAdapterCombination()
.