如何在单个线程中执行一系列步骤,并在 spring 集成中使用异步流?

How to perform a series of steps in a single thread, with an async flow in spring-integration?

我目前有一个 spring-集成 (v4.3.24) 流程,如下所示:

           |
           | list of
           | filepaths
      +----v---+
      |splitter|
      +----+---+
           | filepath
           |
+----------v----------+
|sftp-outbound-gateway|
|        "get"        |
+----------+----------+
           | file
+---------------------+
|     +----v----+     |
|     |decryptor|     |
|     +----+----+     |
|          |          |
|    +-----v------+   | set of transformers
|    |decompressor|   | (with routers before them
|    +-----+------+   | because some steps are optional)
|          |          | that process the file;
|       +--v--+       | call this "FileProcessor"
|       | ... |       |
|       +--+--+       |
+---------------------+
           |
      +----v----+
      |save file|
      | to disk |
      +----+----+
           |

以上所有频道都是 DirectChannels - 是的,我知道这是一个糟糕的结构。这对于少量文件来说效果很好。但是现在,我必须处理数以千计的文件,这些文件需要经过相同的流程 - 基准测试显示这需要大约 1 天的时间才能完成处理。因此,我计划在此流程中引入一些并行处理。我想修改我的流程来实现这样的目标:

                                    |
                                    |
                         +----------v----------+
                         |sftp-outbound-gateway|
                         |       "mget"        |
                         +----------+----------+
                                    | list of files
                                    |
                               +----v---+
                               |splitter|
                               +----+---+
         one thread             one | thread        ...
           +------------------------+---------------+--+--+--+--+
           | file                   | file          |  |  |  |  |
+---------------------+  +---------------------+
|     +----v----+     |  |     +----v----+     |
|     |decryptor|     |  |     |decryptor|     |
|     +----+----+     |  |     +----+----+     |
|          |          |  |          |          |
|    +-----v------+   |  |    +-----v------+   |   ...
|    |decompressor|   |  |    |decompressor|   |
|    +-----+------+   |  |    +-----+------+   |
|          |          |  |          |          |
|       +--v--+       |  |       +--v--+       |
|       | ... |       |  |       | ... |       |
|       +--+--+       |  |       +--+--+       |
+---------------------+  +---------------------+
           |                        |
      +----v----+              +----v----+
      |save file|              |save file|
      | to disk |              | to disk |
      +----+----+              +----+----+
           |                        |
           |                        |

对于并行处理,我将文件从拆分器输出到 ExecutorChannelThreadPoolTaskExecutor

我的一些问题:

  1. 我希望一个文件的所有“FileProcessor”步骤都发生在同一个线程上,同时并行处理多个文件。我怎样才能做到这一点?
    我从 看到 ExecutorChannelMessageHandlerChain 流程将提供这样的功能。但是,“FileProcessor”中的一些步骤是可选的(使用 selector-expression 和路由器跳过一些步骤)——排除使用 MessageHandlerChain。我可以在里面安装几个 MessageHandlerChains 和 Filters,但这或多或少成为了 #2.

    中提到的方法
  2. 如果 #1 无法实现,从分离器开始更改所有通道类型,从 DirectChannelExecutorChannel 是否有助于引入一些并行性?如果是,我应该为每个频道创建一个新的 TaskExecutor 还是可以为所有频道重复使用一个 TaskExecutor bean(我不能在 TaskExecutor bean 上设置 scope="prototype")?

  3. 您认为哪种方法(#1 或#2)更好?为什么?

  4. 如果我执行全局错误处理,就像提到的方法here,即使一个文件出错,其他文件是否会继续处理?

它会根据您的需要使用 ExecutorChannel 作为解密器的输入,并将所有其余部分保留为直接通道;剩余的流程不必是链条,每个组件将 运行 在执行程序的一个线程上。

您需要确保所有下游组件都是线程安全的。

错误处理应该保持原样;每个子流都是独立的。