如何使用 Spring Cloud Stream app starter TCP 处理消息
How to handle message using Spring Cloud Stream app starter TCP
我想使用 Spring Cloud Stream App Starter TCP Source project(maven 工件)以便能够通过 socket/port 接收 TCP 消息,处理它们 然后推送将结果发送给消息代理(例如 RabbitMQ)。
这个 TCP 源项目似乎完全符合我的要求,但它会自动将接收到的消息发送到输出通道。那么,是否有一种干净的方法可以继续使用 TCP 源项目,但拦截 TCP 传入消息以在将它们输出到我的消息代理之前在内部进行转换?
您使用来源和处理器创建聚合应用程序。
Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:
sources, sinks, processors ...
They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.
编辑
作为解决 Source 自动装配问题的方法,您可以尝试类似...
@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpSourceProperties.class)
public class MyTcpSourceConfiguration {
@Autowired
private Source channels;
@Autowired
private TcpSourceProperties properties;
@Bean
public TcpReceivingChannelAdapter adapter(
@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannelName("toMyProcessor");
return adapter;
}
@ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT)
public byte[] myProcessor(byte[] fromTcp) {
...
}
@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
@Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception {
TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}
@Bean
public EncoderDecoderFactoryBean tcpSourceDecoder() {
EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}
}
我想使用 Spring Cloud Stream App Starter TCP Source project(maven 工件)以便能够通过 socket/port 接收 TCP 消息,处理它们 然后推送将结果发送给消息代理(例如 RabbitMQ)。
这个 TCP 源项目似乎完全符合我的要求,但它会自动将接收到的消息发送到输出通道。那么,是否有一种干净的方法可以继续使用 TCP 源项目,但拦截 TCP 传入消息以在将它们输出到我的消息代理之前在内部进行转换?
您使用来源和处理器创建聚合应用程序。
Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:
sources, sinks, processors ...
They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.
编辑
作为解决 Source 自动装配问题的方法,您可以尝试类似...
@EnableBinding(Source.class)
@EnableConfigurationProperties(TcpSourceProperties.class)
public class MyTcpSourceConfiguration {
@Autowired
private Source channels;
@Autowired
private TcpSourceProperties properties;
@Bean
public TcpReceivingChannelAdapter adapter(
@Qualifier("tcpSourceConnectionFactory") AbstractConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(connectionFactory);
adapter.setOutputChannelName("toMyProcessor");
return adapter;
}
@ServiceActivator(inputChannel = "toMyProcessor", outputChannel = Source.OUTPUT)
public byte[] myProcessor(byte[] fromTcp) {
...
}
@Bean
public TcpConnectionFactoryFactoryBean tcpSourceConnectionFactory(
@Qualifier("tcpSourceDecoder") AbstractByteArraySerializer decoder) throws Exception {
TcpConnectionFactoryFactoryBean factoryBean = new TcpConnectionFactoryFactoryBean();
factoryBean.setType("server");
factoryBean.setPort(this.properties.getPort());
factoryBean.setUsingNio(this.properties.isNio());
factoryBean.setUsingDirectBuffers(this.properties.isUseDirectBuffers());
factoryBean.setLookupHost(this.properties.isReverseLookup());
factoryBean.setDeserializer(decoder);
factoryBean.setSoTimeout(this.properties.getSocketTimeout());
return factoryBean;
}
@Bean
public EncoderDecoderFactoryBean tcpSourceDecoder() {
EncoderDecoderFactoryBean factoryBean = new EncoderDecoderFactoryBean(this.properties.getDecoder());
factoryBean.setMaxMessageSize(this.properties.getBufferSize());
return factoryBean;
}
}