Spring 为两个不同的过程使用 1 个通道的集成

Spring integration using 1 channel for two different proccesses

我遇到一个问题,我无法设置进程以并行进入两个通道,它要么进入一个,要么进入另一个。我有一个 ServiceActivator 和一个 Transformer 具有相同的输入通道 objectOutputChannel。转到 processChannel 之后,它应该同时转到两个,但它只转到其中一个,并且在每个请求中都不同。我该怎么做才能让它按照我想要的方式工作?

    @Bean
    public TcpReceivingChannelAdapter channelAdapter(AbstractServerConnectionFactory connectionFactory) {
        final MsgReceivingChannelAdapter adapter = new MsgReceivingChannelAdapter();
        adapter.setConnectionFactory(connectionFactory);
        adapter.setOutputChannel(messageChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "outputChannel")
    public TcpSendingMessageHandler messageHandler(AbstractServerConnectionFactory connectionFactory){
        final MsgSendingMessageHandler handler = new MsgSendingMessageHandler();
        handler.setConnectionFactory(connectionFactory);
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "objectOutputChannel")
    public AmqpOutboundEndpoint objectOutboundEndpoint() {
        return Amqp
                .outboundAdapter(rabbitTemplate)
                .exchangeName(objectExchange)
                .get();
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "loggingChannel")
    public ObjectToStringTransformer loggingTransformer() {
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "loggingChannel")
    public void loggingService(String message) {
        messageLogger.info(message);
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "processChannel")
    public ObjectDeserializer objectDeserializer() {
        return new ObjectDeserializer();
    }

    @ServiceActivator(inputChannel = "processChannel", outputChannel = "objectOutputChannel")
    public MyObject processService(MyObject object) {
        return objectService.check(object);
    }

    @Bean
    @Transformer(inputChannel = "objectOutputChannel", outputChannel = "outputChannel")
    public ObjectSerializer objectSerializer() {
        return new ObjectSerializer();
    }

    @Bean(name = "loggingChannel")
    public MessageChannel loggingChannel() {
        return new DirectChannel();
    }

    @Bean(name = "outputChannel")
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean(name = "messageChannel")
    public MessageChannel messageChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean(name = "objectOutputChannel")
    public MessageChannel objectOutputChannel() {
        return new PublishSubscribeChannel());

案例 A 的调试日志 - 不进入 objectOutboundEndpoint() 并进入 objectSerializer()

2021-02-09 11:23:40.221 DEBUG 8964 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:23:40.223 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:23:40.231 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=..]
2021-02-09 11:23:42.383 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.385 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectSerializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : bean 'objectTcpController.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : bean 'messageHandler'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@38499e48' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.389 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'objectTcpController.messageHandler.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

案例 B 的调试日志 - 不进入 objectSerializer() 并进入 objectOutboundEndpoint()

2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489' received message: GenericMessage [payload=...]
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_tcp_remotePort] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_connectionId] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_localInetAddress] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_address] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[history] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_hostname] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : handler 'bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=..]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

您所描述的与 objectOutputChannelmessageChannel 无关,因为它们都是 PublishSubscribeChannel class 实例。您所描述的是一些 DirectChannel 及其循环默认调度策略。但是由于我们没有几个订阅者直接订阅您向我们展示的频道,所以问题出在其他地方。

我建议您打开 Message History 并调查 org.springframework.integration 类别的 DEBUG 日志,以查看您的消息如何通过流程。在那里你可以找出哪个频道是有罪的。

您在代码段中显示的内容是正确的,看起来与您描述的内容不相关...

更新

这是我们在您的日志中看到的内容:

o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]

因此,您的 objectOutputChannel 实际上是一个 DirectChannel 实例,它的 bean 在 ObjectIntegrationConfiguration 中声明。如果该配置不是您在问题中显示的内容,那么您有一个“bean 定义覆盖”竞争条件,并且您的 PublishSubscribeChannelDirectChannel 覆盖。请修改您的项目配置并尝试找到您声明 objectOutputChannel bean 的其他地方。