使用 spring 集成监听 TCP 提要
Listen on TCP feed with spring-integration
我想连接到两个不同的 TLS 加密 TCP 源并读取和发送 json 数据。当然会以某种方式处理传入的信息。它不一定需要在 spring-integration 中处理,但可以放入队列或其他任何东西中。
我不想对主机和端口进行硬编码,因为我在运行时从我正在调用的 REST API 中获取它们。
我发送的内容不一定会产生回复,我也不一定期望从我发送的数据中得到回复。
我在使用 spring 集成实现这一点时遇到了一些困难。
我设法得到了某种但不太有效的东西:
public static IntegrationFlow RegisterFeedFlow(final IntegrationFlowContext flowContext,
final String id, final String host, final int port) {
IntegrationFlow feedFlow = f -> f
.handle(Tcp
.outboundGateway(Tcp.netClient(host, port).serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1()))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString()).handle(System.out::println);
flowContext.registration(feedFlow).id(id).register();
return feedFlow;
}
这里缺少的是:
- TLS.
- 因为我正在使用 Tcp.outboundGateway() 它总是期望我发送的数据得到回复。我不要这个。
我假设我可以通过在 .serializer() 和 .deserializer() 中放入一些内容来自动编码和解码 json。这个假设是否正确?
我该如何正确实施?
@SpringBootApplication
public class Gitter66Application {
public static void main(String[] args) {
SpringApplication.run(Gitter66Application.class, args);
}
@Bean
public TcpConnectionFactoryFactoryBean privateFeedClient() {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
fb.setHost("localhost");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.crlf());
fb.setDeserializer(TcpCodecs.lengthHeader1());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public TcpConnectionFactoryFactoryBean privateFeedServer() {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.lengthHeader1());
fb.setDeserializer(TcpCodecs.crlf());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public IntegrationFlow flowOut(AbstractClientConnectionFactory cf) {
return f -> f.handle(Tcp.outboundAdapter(cf));
}
@Bean
public IntegrationFlow flowIn(AbstractServerConnectionFactory cf) {
return IntegrationFlows.from(Tcp.inboundAdapter(cf))
.transform(Transformers.objectToString())
.handle(System.out::println)
.get();
}
@Bean
public ApplicationRunner runner(@Qualifier("flowOut.input") MessageChannel channel) {
return args -> {
channel.send(new GenericMessage<>("foo"));
};
}
}
结果:
GenericMessage [payload=foo, headers={ip_tcp_remotePort=62471, ...
编辑
当然,您可以按照问题中的方式动态注册流程。动态注册工厂:
private TcpConnectionFactoryFactoryBean privateFeedClient(String host, int port) {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
fb.setHost(host);
fb.setPort(port);
fb.setSerializer(TcpCodecs.crlf());
fb.setDeserializer(TcpCodecs.lengthHeader1());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
private TcpConnectionFactoryFactoryBean privateFeedServer(int port) {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.lengthHeader1());
fb.setDeserializer(TcpCodecs.crlf());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public ApplicationRunner runner(GenericApplicationContext context) {
return args -> {
TcpConnectionFactoryFactoryBean server = privateFeedServer(1234);
context.registerBean("server", TcpConnectionFactoryFactoryBean.class,
() -> server);
TcpConnectionFactoryFactoryBean client = privateFeedClient("localhost", 1234);
context.registerBean("client", TcpConnectionFactoryFactoryBean.class,
() -> client);
// register flows
};
}
我想连接到两个不同的 TLS 加密 TCP 源并读取和发送 json 数据。当然会以某种方式处理传入的信息。它不一定需要在 spring-integration 中处理,但可以放入队列或其他任何东西中。
我不想对主机和端口进行硬编码,因为我在运行时从我正在调用的 REST API 中获取它们。
我发送的内容不一定会产生回复,我也不一定期望从我发送的数据中得到回复。
我在使用 spring 集成实现这一点时遇到了一些困难。 我设法得到了某种但不太有效的东西:
public static IntegrationFlow RegisterFeedFlow(final IntegrationFlowContext flowContext,
final String id, final String host, final int port) {
IntegrationFlow feedFlow = f -> f
.handle(Tcp
.outboundGateway(Tcp.netClient(host, port).serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1()))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString()).handle(System.out::println);
flowContext.registration(feedFlow).id(id).register();
return feedFlow;
}
这里缺少的是:
- TLS.
- 因为我正在使用 Tcp.outboundGateway() 它总是期望我发送的数据得到回复。我不要这个。
我假设我可以通过在 .serializer() 和 .deserializer() 中放入一些内容来自动编码和解码 json。这个假设是否正确?
我该如何正确实施?
@SpringBootApplication
public class Gitter66Application {
public static void main(String[] args) {
SpringApplication.run(Gitter66Application.class, args);
}
@Bean
public TcpConnectionFactoryFactoryBean privateFeedClient() {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
fb.setHost("localhost");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.crlf());
fb.setDeserializer(TcpCodecs.lengthHeader1());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public TcpConnectionFactoryFactoryBean privateFeedServer() {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.lengthHeader1());
fb.setDeserializer(TcpCodecs.crlf());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public IntegrationFlow flowOut(AbstractClientConnectionFactory cf) {
return f -> f.handle(Tcp.outboundAdapter(cf));
}
@Bean
public IntegrationFlow flowIn(AbstractServerConnectionFactory cf) {
return IntegrationFlows.from(Tcp.inboundAdapter(cf))
.transform(Transformers.objectToString())
.handle(System.out::println)
.get();
}
@Bean
public ApplicationRunner runner(@Qualifier("flowOut.input") MessageChannel channel) {
return args -> {
channel.send(new GenericMessage<>("foo"));
};
}
}
结果:
GenericMessage [payload=foo, headers={ip_tcp_remotePort=62471, ...
编辑
当然,您可以按照问题中的方式动态注册流程。动态注册工厂:
private TcpConnectionFactoryFactoryBean privateFeedClient(String host, int port) {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
fb.setHost(host);
fb.setPort(port);
fb.setSerializer(TcpCodecs.crlf());
fb.setDeserializer(TcpCodecs.lengthHeader1());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
private TcpConnectionFactoryFactoryBean privateFeedServer(int port) {
TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
fb.setPort(1234);
fb.setSerializer(TcpCodecs.lengthHeader1());
fb.setDeserializer(TcpCodecs.crlf());
fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
"trustStore.ks", "keyStorePassword", "trustStorePassword"));
return fb;
}
@Bean
public ApplicationRunner runner(GenericApplicationContext context) {
return args -> {
TcpConnectionFactoryFactoryBean server = privateFeedServer(1234);
context.registerBean("server", TcpConnectionFactoryFactoryBean.class,
() -> server);
TcpConnectionFactoryFactoryBean client = privateFeedClient("localhost", 1234);
context.registerBean("client", TcpConnectionFactoryFactoryBean.class,
() -> client);
// register flows
};
}