如何通过调用另一个流在 java dsl 中进行拆分聚合?

How to do split-aggregate in java dsl by invoking another flow?

在下面的示例中,我得到 [Manoj, Jeeva] 作为输出。但是 [Hello Manoj, Hello Jeeva] 是预期的。为什么 serviceChnl 不提供聚合输出?

 @Bean
  public IntegrationFlow sayHelloIntFlow() {
    return IntegrationFlows.from("serviceChnl")
                           .handle(new GenericHandler<String>() {
                             public Object handle(String payload, Map<String, Object> headers) {
                               return "Hello " + payload;
                             }
                           })
                           .get();
  }




@Bean
  public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitChnl")
                           .split()
                           .channel("serviceChnl")
                           .aggregate()
                           .handle(new GenericHandler() {
                             public Object handle(Object payload, Map headers) {
                               System.out.println(payload);
                               return null;
                             }
                           })
                           .channel("nullChannel")
                           .get();
  }




@Test
  public void test() {
    String[] strArr = new String[] {"Manoj", "Jeeva"};
    Message msg = MessageBuilder.withPayload(strArr)
                                .build();
    splitChnl.send(msg);
  }

我明白了,拆分消息后,我要么丰富要么转换。我不应该把它放到频道里。

@Bean
  public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitChnl")
                           .split()
                           .transform(new HelloTransformer())
                           .aggregate()
                           .handle(new ShowOutput<String>())
                           .channel("nullChannel")
                           .get();
  }

这是正确答案,您可以自己接受。你的问题是你误解了一点 inter-channel 的概念。它们无意将消息发送到单独的流(我们也可以这样做),但它们将这些端点连接到一个流中。它们无论如何都在那里,即使你没有在那里声明 .channel() 。对于另一个流程,我们有 .wireTap().gateway()。请阅读关于 <chain> and DSL Manual 的 Spring 集成参考手册。有足够的信息不会混淆开发阶段...

下面是使用 .gateway 委托给另一个流程的示例。 DefaultAggregatingMessageGroupProcessor 用于将各个消息有效负载聚合到有效负载集合中。

@Bean
IntegrationFlow splitAndDelegate(IntegrationFlow delegateFlow) {
    return flowDef -> flowDef.split()
        .gateway(delegateFlow)
        .aggregate(aggregatorSpec -> aggregatorSpec.outputProcessor(new DefaultAggregatingMessageGroupProcessor()));
}