Spring集成Tcp连接单用VS setSoKeepAlive
Spring Integration Tcp connection single-use VS setSoKeepAlive
我正在使用
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
对于 TcpOutboundGateway,TcpOutboundGateway 通常按 req/reply 命令工作,但在我的例子中,我扩展了 TcpOutboundGateway 以使用 MessageChannel 接收任意消息。这就是为什么我认为我应该使用
cf.setLeaveOpen(true)
保持连接打开。
虽然我开始使用那个选项,但在我再次调用 tcp 服务器很长时间后,我收到了
异常如
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
但我不明白为什么会出现此错误,因为我设置了“true”以在我的连接工厂中保持连接打开。
然后
我做了一些 google 它应该使用 CachingClientConnectionFactory,我知道默认情况下它是 single-use=true 并且不应该将其更改为 false,但我假设连接将打开并关闭我的每个请求响应事务,那么在没有来自客户端的任何请求的情况下从服务器接收任意数据是否有障碍?
或
我应该如何保持客户端和服务器之间的开放连接?我应该使用
cf.setSoKeepAlive(true) ?
保持连接打开?
是
cf.setSoKeepAlive(true) and cf.setLeaveOpen(true)
彼此相同?
EDIT
另外,当我使用 cf.setSoKeepAlive(true) 时,1 小时后我也遇到了同样的异常。
完整代码:
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String flowRegisterKey;
if (hasThisConnectionIrregularChannel) {
flowRegisterKey = host + port + ".extended";
} else {
flowRegisterKey = host + port;
}
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
CachingClientConnectionFactory ccf = new CachingClientConnectionFactory(cf, 20);
ccf.setSoKeepAlive(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
ccf.setSerializer(byteArrayCrLfSerializer);
ccf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway;
if (hasThisConnectionIrregularChannel) {
String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
DirectChannel directChannel = getBeanFactory().getBean(unsolicitedMessageChannelName, DirectChannel.class);
tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
} else {
tcpOutboundGateway = new TcpOutboundGateway();
}
tcpOutboundGateway.setRemoteTimeout(20000);
tcpOutboundGateway.setConnectionFactory(ccf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(ccf)
.id(flowRegisterKey + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(flowRegisterKey, inputChannel);
return inputChannel;
}
为什么要使用 CachingClientConnectionFactory
?当您保持连接打开时不需要它;它旨在用于维护多个打开的连接。
Timed out waiting for response
表示发送请求时套接字打开得很好(从客户端的角度来看);我们只是没有得到回复。这可能意味着某些网络组件(路由器)由于不活动而静默关闭了套接字。 Keep-alives 应该对此有所帮助,但这取决于您的操作系统以及 TCP 堆栈配置为发送 keep-alives 的频率。
我正在使用
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
对于 TcpOutboundGateway,TcpOutboundGateway 通常按 req/reply 命令工作,但在我的例子中,我扩展了 TcpOutboundGateway 以使用 MessageChannel 接收任意消息。这就是为什么我认为我应该使用
cf.setLeaveOpen(true)
保持连接打开。
虽然我开始使用那个选项,但在我再次调用 tcp 服务器很长时间后,我收到了 异常如
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
但我不明白为什么会出现此错误,因为我设置了“true”以在我的连接工厂中保持连接打开。
然后
我做了一些 google 它应该使用 CachingClientConnectionFactory,我知道默认情况下它是 single-use=true 并且不应该将其更改为 false,但我假设连接将打开并关闭我的每个请求响应事务,那么在没有来自客户端的任何请求的情况下从服务器接收任意数据是否有障碍?
或
我应该如何保持客户端和服务器之间的开放连接?我应该使用
cf.setSoKeepAlive(true) ?
保持连接打开? 是
cf.setSoKeepAlive(true) and cf.setLeaveOpen(true)
彼此相同?
EDIT
另外,当我使用 cf.setSoKeepAlive(true) 时,1 小时后我也遇到了同样的异常。
完整代码:
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
boolean hasThisConnectionIrregularChannel = message.getHeaders().containsKey("irregularMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String flowRegisterKey;
if (hasThisConnectionIrregularChannel) {
flowRegisterKey = host + port + ".extended";
} else {
flowRegisterKey = host + port;
}
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
CachingClientConnectionFactory ccf = new CachingClientConnectionFactory(cf, 20);
ccf.setSoKeepAlive(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
ccf.setSerializer(byteArrayCrLfSerializer);
ccf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway;
if (hasThisConnectionIrregularChannel) {
String unsolicitedMessageChannelName = (String) message.getHeaders().get("irregularMessageChannelName");
DirectChannel directChannel = getBeanFactory().getBean(unsolicitedMessageChannelName, DirectChannel.class);
tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
} else {
tcpOutboundGateway = new TcpOutboundGateway();
}
tcpOutboundGateway.setRemoteTimeout(20000);
tcpOutboundGateway.setConnectionFactory(ccf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(ccf)
.id(flowRegisterKey + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(flowRegisterKey, inputChannel);
return inputChannel;
}
为什么要使用 CachingClientConnectionFactory
?当您保持连接打开时不需要它;它旨在用于维护多个打开的连接。
Timed out waiting for response
表示发送请求时套接字打开得很好(从客户端的角度来看);我们只是没有得到回复。这可能意味着某些网络组件(路由器)由于不活动而静默关闭了套接字。 Keep-alives 应该对此有所帮助,但这取决于您的操作系统以及 TCP 堆栈配置为发送 keep-alives 的频率。