Spring 集成重新连接 TCP 发送握手
Spring integration on reconnect TCP send handshake
我正在尝试使用 TcpSendingMessageHandler 上的 ClientMode 来自动重新连接。此连接需要保持打开状态,因此实现重新连接功能很重要。
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
cf.setLeaveOpen(true);
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
fc.setInterceptors(new TcpConnectionInterceptorFactory[] { factory} );
cf.setInterceptorFactoryChain(fc);
cf.setDeserializer(new CustomSerializerDeserializer());
messageHandler.setErrorChannel(errorChannel);
messageHandler.setConnectionFactory(cf);
messageHandler.setOutputChannel(outputChannel);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
handler.setClientMode(true);
handler.setRetryInterval(5000);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext
.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
不幸的是,我需要在连接套接字打开后发送一次握手。
我试图添加一个post中描述的消息拦截器()但是当连接套接字恢复时它没有进入拦截器。
我在恢复套接字时收到此警告:
HelloWorldInterceptor : No publisher available to publish TcpConnectionOpenEvent [source=HelloWorldInterceptor
拦截器发出警告,因为工厂未正确初始化。
您还需要为拦截器工厂添加 .addBean()
。
编辑
如评论中所述,拦截器的替代方法是使用事件侦听器。
我刚刚用这个测试过,没发现问题:
@SpringBootApplication
public class So65899947Application {
public static void main(String[] args) {
SpringApplication.run(So65899947Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
.clientMode(true)
.retryInterval(5000));
}
}
@Component
class InitialSender {
private final IntegrationFlow flow;
InitialSender(IntegrationFlow flow) {
this.flow = flow;
}
@EventListener
public void opened(TcpConnectionOpenEvent event) {
this.flow.getInputChannel().send(new GenericMessage<>("Hello there",
Collections.singletonMap(IpHeaders.CONNECTION_ID, event.getConnectionId())));
}
}
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
EDIT2
当我将其更改为使用网关时,结果相同...
@SpringBootApplication
@IntegrationComponentScan
public class So65899947Application {
public static void main(String[] args) {
SpringApplication.run(So65899947Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
.clientMode(true)
.retryInterval(5000));
}
}
@MessagingGateway(defaultRequestChannel = "flow.input")
interface Gate {
void handShake(@Payload String message, @Header(IpHeaders.CONNECTION_ID) String conn);
}
@Component
class InitialSender {
private final Gate gate;
InitialSender(Gate gate) {
this.gate = gate;
}
@EventListener
public void opened(TcpConnectionOpenEvent event) {
this.gate.handShake("Hello there", event.getConnectionId());
}
}
我正在尝试使用 TcpSendingMessageHandler 上的 ClientMode 来自动重新连接。此连接需要保持打开状态,因此实现重新连接功能很重要。
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
cf.setLeaveOpen(true);
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
fc.setInterceptors(new TcpConnectionInterceptorFactory[] { factory} );
cf.setInterceptorFactoryChain(fc);
cf.setDeserializer(new CustomSerializerDeserializer());
messageHandler.setErrorChannel(errorChannel);
messageHandler.setConnectionFactory(cf);
messageHandler.setOutputChannel(outputChannel);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
handler.setClientMode(true);
handler.setRetryInterval(5000);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext
.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
不幸的是,我需要在连接套接字打开后发送一次握手。
我试图添加一个post中描述的消息拦截器(
我在恢复套接字时收到此警告:
HelloWorldInterceptor : No publisher available to publish TcpConnectionOpenEvent [source=HelloWorldInterceptor
拦截器发出警告,因为工厂未正确初始化。
您还需要为拦截器工厂添加 .addBean()
。
编辑
如评论中所述,拦截器的替代方法是使用事件侦听器。
我刚刚用这个测试过,没发现问题:
@SpringBootApplication
public class So65899947Application {
public static void main(String[] args) {
SpringApplication.run(So65899947Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
.clientMode(true)
.retryInterval(5000));
}
}
@Component
class InitialSender {
private final IntegrationFlow flow;
InitialSender(IntegrationFlow flow) {
this.flow = flow;
}
@EventListener
public void opened(TcpConnectionOpenEvent event) {
this.flow.getInputChannel().send(new GenericMessage<>("Hello there",
Collections.singletonMap(IpHeaders.CONNECTION_ID, event.getConnectionId())));
}
}
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
$ nc -l 1234
Hello there
^C
EDIT2
当我将其更改为使用网关时,结果相同...
@SpringBootApplication
@IntegrationComponentScan
public class So65899947Application {
public static void main(String[] args) {
SpringApplication.run(So65899947Application.class, args);
}
@Bean
public IntegrationFlow flow() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
.clientMode(true)
.retryInterval(5000));
}
}
@MessagingGateway(defaultRequestChannel = "flow.input")
interface Gate {
void handShake(@Payload String message, @Header(IpHeaders.CONNECTION_ID) String conn);
}
@Component
class InitialSender {
private final Gate gate;
InitialSender(Gate gate) {
this.gate = gate;
}
@EventListener
public void opened(TcpConnectionOpenEvent event) {
this.gate.handShake("Hello there", event.getConnectionId());
}
}