如何配置 spring 仅连接客户端和服务器发送消息的集成适配器

How to configure spring integration adapters of a merely connecting client and a server sending messages

我正在尝试使用 Spring 集成实现以下场景:

我需要一个客户端通过 TCP IP 连接到服务器并在 30 秒内等待接收消息。 我需要一台服务器向已连接的客户端发送 0 到 n 条消息。 我需要一种在不丢失消息的情况下启动和停止频道传输的方法。 我需要更改服务器在停止和启动之间侦听的端口。

这是我目前的配置:

@Configuration
public class TcpConfiguration {
    private static Logger LOG = LoggerFactory.getLogger(TcpConfiguration.class);

    @Value("${port}")
    private Integer port;

    @Value("${so-timeout}")
    private Integer soTimeout;

    @Value("${keep-alive}")
    private Boolean keepAlive;

    @Value("${send-timeout}")
    private Integer sendTimeout;

    @Bean
    public AbstractServerConnectionFactory getMyConnFactory() {
        LOG.debug("getMyConnFactory");
        TcpNetServerConnectionFactory factory = new TcpNetServerConnectionFactory(port);
        LOG.debug("getMyConnFactory port={}", port);
        factory.setSoTimeout(soTimeout);
        LOG.debug("getMyConnFactory soTimeout={}", soTimeout);
        factory.setSoKeepAlive(true);
        LOG.debug("getMyConnFactory keepAlive={}", keepAlive);
        return factory;
    }

    @Bean
    public AbstractEndpoint getMyChannelAdapter() {
        LOG.debug("getMyChannelAdapter");
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(getMyConnFactory());
        adapter.setOutputChannel(myChannelIn());
        adapter.setSendTimeout(sendTimeout);
        LOG.debug("getMyChannelAdapter adapter={}", adapter.getClass().getName());
        return adapter;
    }

    @Bean
    public MessageChannel myChannelIn() {
        LOG.debug("myChannelIn");
        return new DirectChannel();
    }

    @Bean
    @Transformer(inputChannel = "myChannelIn", outputChannel = "myServiceChannel")
    public ObjectToStringTransformer myTransformer() {
        LOG.debug("myTransformer");
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "myServiceChannel")
    public void service(String in) {
        LOG.debug("service received={}", in);
    }

    @Bean
    public MessageChannel myChannelOut() {
        LOG.debug("myChannelOut");
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow myOutbound() {
        LOG.debug("myOutbound");
        return IntegrationFlows.from(myChannelOut())
                .handle(mySender())
                .get();
    }

    @Bean
    public MessageHandler mySender() {
        LOG.debug("mySender");
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(getMyConnFactory());
        return tcpSendingMessageHandler;
    }
}

请指教!

要更改服务器端口,我会关闭应用程序上下文并在远程配置服务器中配置新端口后重新启动它。 我可以在不破坏当前消息传输的情况下关闭应用程序上下文吗? 我不知道如何处理仅连接的客户端。

使用dynamic flow registration;只获取连接打开它而不发送。

@SpringBootApplication
public class So62867670Application {

    public static void main(String[] args) {
        SpringApplication.run(So62867670Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(DynamicTcpReceiver receiver) {
        return args -> { // Just a demo to show starting/stopping
            receiver.connectAndListen(1234);
            System.in.read();
            receiver.stop();
            System.in.read();
            receiver.connectAndListen(1235);
            System.in.read();
            receiver.stop();
        };
    }

}

@Component
class DynamicTcpReceiver {

    @Autowired
    private IntegrationFlowContext context;

    private IntegrationFlowRegistration registration;

    public void connectAndListen(int port) throws InterruptedException {
        TcpClientConnectionFactorySpec client = Tcp.netClient("localhost", port)
                .deserializer(TcpCodecs.lf());
        IntegrationFlow flow = IntegrationFlows.from(Tcp.inboundAdapter(client))
                .transform(Transformers.objectToString())
                .handle(System.out::println)
                .get();
        this.registration = context.registration(flow).register();
        client.get().getConnection(); // just open the single shared connection
    }

    public void stop() {
        if (this.registration != null) {
            this.registration.destroy();
            this.registration = null;
        }
    }
}

编辑

这是服务器端...

@SpringBootApplication
@EnableScheduling
public class So62867670ServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(So62867670ServerApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(DynamicTcpServer receiver) {
        return args -> { // Just a demo to show starting/stopping
            receiver.tcpListen(1234);
            System.in.read();
            receiver.stop(1234);
            System.in.read();
            receiver.tcpListen(1235);
            System.in.read();
            receiver.stop(1235);
        };
    }

}

@Component
class DynamicTcpServer {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, Entry<Integer, AtomicInteger>> clients = new ConcurrentHashMap<>();

    public void tcpListen(int port) {
        TcpServerConnectionFactorySpec server = Tcp.netServer(port)
                .id("server-" + port)
                .serializer(TcpCodecs.lf());
        server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
        IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
        this.registrations.put(port, flowContext.registration(flow).register());
    }

    public void stop(int port) {
        IntegrationFlowRegistration registration = this.registrations.remove(port);
        if (registration != null) {
            registration.destroy();
        }
    }

    @EventListener
    public void closed(TcpConnectionOpenEvent event) {
        LOG.info(event.toString());
        String connectionId = event.getConnectionId();
        String[] split = connectionId.split(":");
        int port = Integer.parseInt(split[2]);
        this.clients.put(connectionId, new AbstractMap.SimpleEntry<>(port, new AtomicInteger()));
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        LOG.info(event.toString());
        this.clients.remove(event.getConnectionId());
    }

    @EventListener
    public void listening(TcpConnectionServerListeningEvent event) {
        LOG.info(event.toString());
    }

    @Scheduled(fixedDelay = 5000)
    public void sender() {
        this.clients.forEach((connectionId, portAndCount) -> {
            IntegrationFlowRegistration registration = this.registrations.get(portAndCount.getKey());
            if (registration != null) {
                LOG.info("Sending to " + connectionId);
                registration.getMessagingTemplate().send(MessageBuilder.withPayload("foo")
                        .setHeader(IpHeaders.CONNECTION_ID, connectionId).build());
                if (portAndCount.getValue().incrementAndGet() > 9) {
                    this.appContext.getBean("server-" + portAndCount.getKey(), TcpNetServerConnectionFactory.class)
                        .closeConnection(connectionId);
                }
            }
        });
    }

}