Spring 集成如果服务器关闭连接怎么办,如何处理它并在接收任意事件时打开
Spring integration what if server close connection, how handle it and open while receiving arbitrary events
我正在使用动态 tcp 路由的基本代码并在 Gary Russell 的帮助下扩展它,
Sub Flow code : In here i can receive arbitrary messages with extending TcpOutboundGateway. If message is coming with arbitrary without request from client, it handles by direct message channel of ExtendedTcpOutBoundGateway
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String flowRegisterKey;
if (hasThisConnectionIrregularChannel) {
flowRegisterKey = host + port + ".extended";
} else {
flowRegisterKey = host + port;
}
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setSoTimeout(0);
cf.setSoKeepAlive(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway;
if (hasThisConnectionIrregularChannel) {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
DirectChannel directChannel = getBeanFactory().getBean(unsolicitedMessageChannelName, DirectChannel.class);
tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
} else {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
tcpOutboundGateway = new TcpOutboundGateway();
}
tcpOutboundGateway.setConnectionFactory(cf);
tcpOutboundGateway.setAdviceChain(Arrays.asList(new Advice[]{tcpRetryAdvice()}));
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(flowRegisterKey + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(flowRegisterKey, inputChannel);
return inputChannel;
}
In here, i can handle retry advice to setup connection again.
@Bean
public RequestHandlerRetryAdvice tcpRetryAdvice() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(1000);
backOffPolicy.setMultiplier(2);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
RequestHandlerRetryAdvice tcpRetryAdvice = new RequestHandlerRetryAdvice();
tcpRetryAdvice.setRetryTemplate(retryTemplate);
// This allows fail-controlling
tcpRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(failMessageChannel()));
return tcpRetryAdvice;
}
@Bean
public MessageChannel failMessageChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "failMessageChannel")
public void messageAggregation(String in) {
log.error("TcpRouter # connection retry failed with message : " + in);
}
但现在我需要处理另一种情况;
如果在接收来自服务器连接的任意消息期间关闭怎么办?可以通过某种方式从服务器端关闭连接,我需要处理连接关闭事件并再次设置连接以继续接收任意事件。
我应该如何通过 spring 集成捕获该事件并重新建立连接?
EDIT 1
我用谷歌搜索了一些相同的问题,并从这个 link EventListener topic 中找到了 EventListener,但据我所知 link 只有一个连接,所以在我的情况下我是客户端并使用动态 tcp 路由,这就是为什么我需要实现逻辑来处理不同的连接。
EDIT 2
这里是确定目标频道
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageChannel channel;
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
if (hasThisConnectionIrregularChannel) {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port") + ".extended");
} else {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
}
if (channel == null) {
channel = createNewSubflow(message);
}
return Collections.singletonList(channel);
}
这里是ToTCP接口
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface ToTCP {
byte[] send(String data, @Header("host") String host, @Header("port") int port, @Header("irregularMessageChannelName") String channelName);
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
.addBean(cf)
如果您使用 .addBean("someBeanName", cf)
,您可以检查 TcpConnectionClosedEvent
的 connectionFactoryName
(这将是您在 addBean()
中给 cf 的 ID)。
然后您可以将新消息路由到具有该工厂的流程以重新建立连接。
我正在使用动态 tcp 路由的基本代码并在 Gary Russell 的帮助下扩展它,
Sub Flow code : In here i can receive arbitrary messages with extending TcpOutboundGateway. If message is coming with arbitrary without request from client, it handles by direct message channel of ExtendedTcpOutBoundGateway
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String flowRegisterKey;
if (hasThisConnectionIrregularChannel) {
flowRegisterKey = host + port + ".extended";
} else {
flowRegisterKey = host + port;
}
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setSoTimeout(0);
cf.setSoKeepAlive(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway;
if (hasThisConnectionIrregularChannel) {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
DirectChannel directChannel = getBeanFactory().getBean(unsolicitedMessageChannelName, DirectChannel.class);
tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
} else {
log.info("TcpRouter # createNewSubflow extended TcpOutboundGateway will be created");
tcpOutboundGateway = new TcpOutboundGateway();
}
tcpOutboundGateway.setConnectionFactory(cf);
tcpOutboundGateway.setAdviceChain(Arrays.asList(new Advice[]{tcpRetryAdvice()}));
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(flowRegisterKey + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(flowRegisterKey, inputChannel);
return inputChannel;
}
In here, i can handle retry advice to setup connection again.
@Bean
public RequestHandlerRetryAdvice tcpRetryAdvice() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(1000);
backOffPolicy.setMultiplier(2);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
RequestHandlerRetryAdvice tcpRetryAdvice = new RequestHandlerRetryAdvice();
tcpRetryAdvice.setRetryTemplate(retryTemplate);
// This allows fail-controlling
tcpRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(failMessageChannel()));
return tcpRetryAdvice;
}
@Bean
public MessageChannel failMessageChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "failMessageChannel")
public void messageAggregation(String in) {
log.error("TcpRouter # connection retry failed with message : " + in);
}
但现在我需要处理另一种情况;
如果在接收来自服务器连接的任意消息期间关闭怎么办?可以通过某种方式从服务器端关闭连接,我需要处理连接关闭事件并再次设置连接以继续接收任意事件。
我应该如何通过 spring 集成捕获该事件并重新建立连接?
EDIT 1
我用谷歌搜索了一些相同的问题,并从这个 link EventListener topic 中找到了 EventListener,但据我所知 link 只有一个连接,所以在我的情况下我是客户端并使用动态 tcp 路由,这就是为什么我需要实现逻辑来处理不同的连接。
EDIT 2
这里是确定目标频道
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageChannel channel;
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
if (hasThisConnectionIrregularChannel) {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port") + ".extended");
} else {
channel = this.subFlows.get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
}
if (channel == null) {
channel = createNewSubflow(message);
}
return Collections.singletonList(channel);
}
这里是ToTCP接口
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface ToTCP {
byte[] send(String data, @Header("host") String host, @Header("port") int port, @Header("irregularMessageChannelName") String channelName);
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
.addBean(cf)
如果您使用 .addBean("someBeanName", cf)
,您可以检查 TcpConnectionClosedEvent
的 connectionFactoryName
(这将是您在 addBean()
中给 cf 的 ID)。
然后您可以将新消息路由到具有该工厂的流程以重新建立连接。