使用 spring 集成监听 TCP 提要

Listen on TCP feed with spring-integration

我想连接到两个不同的 TLS 加密 TCP 源并读取和发送 json 数据。当然会以某种方式处理传入的信息。它不一定需要在 spring-integration 中处理,但可以放入队列或其他任何东西中。

我不想对主机和端口进行硬编码,因为我在运行时从我正在调用的 REST API 中获取它们。

我发送的内容不一定会产生回复,我也不一定期望从我发送的数据中得到回复。

我在使用 spring 集成实现这一点时遇到了一些困难。 我设法得到了某种但不太有效的东西:

    public static IntegrationFlow RegisterFeedFlow(final IntegrationFlowContext flowContext,
            final String id, final String host, final int port) {
        IntegrationFlow feedFlow = f -> f
                .handle(Tcp
                        .outboundGateway(Tcp.netClient(host, port).serializer(TcpCodecs.crlf())
                                .deserializer(TcpCodecs.lengthHeader1()))
                        .remoteTimeout(m -> 5000))
                .transform(Transformers.objectToString()).handle(System.out::println);


        flowContext.registration(feedFlow).id(id).register();

        return feedFlow;
    }

这里缺少的是:

  1. TLS.
  2. 因为我正在使用 Tcp.outboundGateway() 它总是期望我发送的数据得到回复。我不要这个。

我假设我可以通过在 .serializer() 和 .deserializer() 中放入一些内容来自动编码和解码 json。这个假设是否正确?

我该如何正确实施?

@SpringBootApplication
public class Gitter66Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter66Application.class, args);
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedClient() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
        fb.setHost("localhost");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.crlf());
        fb.setDeserializer(TcpCodecs.lengthHeader1());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedServer() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.lengthHeader1());
        fb.setDeserializer(TcpCodecs.crlf());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public IntegrationFlow flowOut(AbstractClientConnectionFactory cf) {
        return f -> f.handle(Tcp.outboundAdapter(cf));
    }

    @Bean
    public IntegrationFlow flowIn(AbstractServerConnectionFactory cf) {
        return IntegrationFlows.from(Tcp.inboundAdapter(cf))
                .transform(Transformers.objectToString())
                .handle(System.out::println)
                .get();
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("flowOut.input") MessageChannel channel) {
        return args -> {
            channel.send(new GenericMessage<>("foo"));
        };
    }

}

结果:

GenericMessage [payload=foo, headers={ip_tcp_remotePort=62471, ...

编辑

当然,您可以按照问题中的方式动态注册流程。动态注册工厂:

    private TcpConnectionFactoryFactoryBean privateFeedClient(String host, int port) {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
        fb.setHost(host);
        fb.setPort(port);
        fb.setSerializer(TcpCodecs.crlf());
        fb.setDeserializer(TcpCodecs.lengthHeader1());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    private TcpConnectionFactoryFactoryBean privateFeedServer(int port) {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.lengthHeader1());
        fb.setDeserializer(TcpCodecs.crlf());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public ApplicationRunner runner(GenericApplicationContext context) {

        return args -> {
            TcpConnectionFactoryFactoryBean server = privateFeedServer(1234);
            context.registerBean("server", TcpConnectionFactoryFactoryBean.class,
                    () -> server);
            TcpConnectionFactoryFactoryBean client = privateFeedClient("localhost", 1234);
            context.registerBean("client", TcpConnectionFactoryFactoryBean.class,
                    () -> client);
            // register flows
        };
    }