Spring 集成重新连接 TCP 发送握手

Spring integration on reconnect TCP send handshake

我正在尝试使用 TcpSendingMessageHandler 上的 ClientMode 来自动重新连接。此连接需要保持打开状态,因此实现重新连接功能很重要。

private MessageChannel createNewSubflow(Message<?> message) {
        String host = (String) message.getHeaders().get("host");
        Integer port = (Integer) message.getHeaders().get("port");
        Assert.state(host != null && port != null, "host and/or port header missing");
        String hostPort = host + port;

        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
        cf.setLeaveOpen(true);
        TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();

        HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
        fc.setInterceptors(new TcpConnectionInterceptorFactory[] { factory} );

        cf.setInterceptorFactoryChain(fc);
        cf.setDeserializer(new CustomSerializerDeserializer());
        messageHandler.setErrorChannel(errorChannel);
        messageHandler.setConnectionFactory(cf);
        messageHandler.setOutputChannel(outputChannel);
        TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
        handler.setConnectionFactory(cf);
        handler.setClientMode(true);
        handler.setRetryInterval(5000);
        IntegrationFlow flow = f -> f.handle(handler);
        IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                this.flowContext
                        .registration(flow)
                        .addBean(cf)
                        .id(hostPort + ".flow")
                        .register();
        MessageChannel inputChannel = flowRegistration.getInputChannel();
        this.subFlows.put(hostPort, inputChannel);
        return inputChannel;
    }

不幸的是,我需要在连接套接字打开后发送一次握手。

我试图添加一个post中描述的消息拦截器()但是当连接套接字恢复时它没有进入拦截器。

我在恢复套接字时收到此警告:

HelloWorldInterceptor        : No publisher available to publish TcpConnectionOpenEvent [source=HelloWorldInterceptor

拦截器发出警告,因为工厂未正确初始化。

您还需要为拦截器工厂添加 .addBean()

编辑

如评论中所述,拦截器的替代方法是使用事件侦听器。

我刚刚用这个测试过,没发现问题:

@SpringBootApplication
public class So65899947Application {

    public static void main(String[] args) {
        SpringApplication.run(So65899947Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
                .clientMode(true)
                .retryInterval(5000));
    }

}

@Component
class InitialSender {

    private final IntegrationFlow flow;

    InitialSender(IntegrationFlow flow) {
        this.flow = flow;
    }

    @EventListener
    public void opened(TcpConnectionOpenEvent event) {
        this.flow.getInputChannel().send(new GenericMessage<>("Hello there", 
                Collections.singletonMap(IpHeaders.CONNECTION_ID, event.getConnectionId())));
    }

}
$ nc -l 1234
Hello there
^C

$ nc -l 1234
Hello there
^C

$ nc -l 1234
Hello there
^C

$ nc -l 1234
Hello there
^C

$ nc -l 1234
Hello there
^C

EDIT2

当我将其更改为使用网关时,结果相同...

@SpringBootApplication
@IntegrationComponentScan
public class So65899947Application {

    public static void main(String[] args) {
        SpringApplication.run(So65899947Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f.handle(Tcp.outboundAdapter(Tcp.netClient("localhost", 1234))
                .clientMode(true)
                .retryInterval(5000));
    }

}

@MessagingGateway(defaultRequestChannel = "flow.input")
interface Gate {

    void handShake(@Payload String message, @Header(IpHeaders.CONNECTION_ID) String conn);

}

@Component
class InitialSender {

    private final Gate gate;

    InitialSender(Gate gate) {
        this.gate = gate;
    }

    @EventListener
    public void opened(TcpConnectionOpenEvent event) {
        this.gate.handShake("Hello there", event.getConnectionId());
    }

}