Spring 与不并行处理的多个执行程序通道集成

Spring integration with multiple executor channel not processing in parallel

我有需要将消息异步传递到多个通道的要求。为了使我的流程异步,我正在使用所有执行程序通道。但出于某种原因,流程仍然是顺序的。我可以看到我在任务执行器中配置的差异线程,但顺序。 这是我使用的配置

<int:channel id="mainChannel">
        <int:interceptors>
            <int:wire-tap channel="channel1"/>
            <int:wire-tap channel="channel2"/>
            <int:wire-tap channel="channel3"/>
      
</int:interceptors>
    </int:channel> 
   <int:channel id="channel1">
        <int:dispatcher task-executor="exec1" />
   </int:channel>
  <int:channel id="channel2">
        <int:dispatcher task-executor="exec2" />
  </int:channel>
   <int:channel id="channel3">
        <int:dispatcher task-executor="exec3" />
  </int:channel>



根据我的理解,所有这些都是异步的(在我的例子中,3 个线程应该 运行 并行)

从日志中我可以看到所有顺序但线程名称不同.. 我假设 preSend/Postsend 应该以随机顺序调用。

我是否遗漏了并行创建多个执行程序通道的任何内容。

非常感谢任何帮助。

您可能需要调用异步实现 bean,如下所示:

<beans:bean id="asyncExecutor"
            class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>

   <int:channel id="channel1">
        <int:dispatcher task-executor="asyncExecutor" />
   </int:channel>
  <int:channel id="channel2">
        <int:dispatcher task-executor="asyncExecutor" />
  </int:channel>
   <int:channel id="channel3">
        <int:dispatcher task-executor="asyncExecutor" />
  </int:channel>

SimpleAsyncTaskExecutor 描述:

public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable

TaskExecutor implementation that fires up a new Thread for each task, executing it asynchronously.

Supports limiting concurrent threads through the "concurrencyLimit" bean property. By default, the number of concurrent threads is unlimited.

NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.

Github 中的用法示例:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd">

    <channel id="taskExecutorOnly">
        <dispatcher task-executor="taskExecutor"/>
    </channel>

    <channel id="failoverFalse">
        <dispatcher failover="false"/>
    </channel>

    <channel id="failoverTrue">
        <dispatcher failover="true"/>
    </channel>

    <channel id="loadBalancerDisabled">
        <dispatcher load-balancer="none"/>
    </channel>

    <channel id="loadBalancerDisabledAndTaskExecutor">
        <dispatcher load-balancer="none" task-executor="taskExecutor"/>
    </channel>

    <channel id="roundRobinLoadBalancerAndTaskExecutor">
        <dispatcher load-balancer="round-robin" task-executor="taskExecutor"/>
    </channel>

    <channel id="lbRefChannel">
        <dispatcher load-balancer-ref="lb"/>
    </channel>

    <beans:bean id="taskExecutor"
                class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>

    <beans:bean id="lb"
                class="org.springframework.integration.channel.config.DispatchingChannelParserTests.SampleLoadBalancingStrategy"/>
</beans:beans>

from log i can see all sequential but with diff thread name

因为日志只是打印消息的一个地方,而且它们实际上是由一个作者打印的,即使来自不同的线程。他们一个一个出现在那边。如果负载良好,您肯定会看到消息以意外的顺序记录。

I am assuming preSend/Postsend should have been called in random order.

事实并非如此。拦截器按它们添加到通道的顺序调用,如果它们的 order 相同,这就是您的情况。如何实现这些拦截器已经不是拦截器链的责任。

我认为您只是运气不好,以任意顺序查看日志,并且可能只是因为这些执行程序通道的消费者是普通记录器 - 没有任何负载来保持线程并且给人的印象是其他线程中的工作已完成并行。