如何通过调用另一个流在 java dsl 中进行拆分聚合?
How to do split-aggregate in java dsl by invoking another flow?
在下面的示例中,我得到 [Manoj, Jeeva]
作为输出。但是 [Hello Manoj, Hello Jeeva]
是预期的。为什么 serviceChnl 不提供聚合输出?
@Bean
public IntegrationFlow sayHelloIntFlow() {
return IntegrationFlows.from("serviceChnl")
.handle(new GenericHandler<String>() {
public Object handle(String payload, Map<String, Object> headers) {
return "Hello " + payload;
}
})
.get();
}
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitChnl")
.split()
.channel("serviceChnl")
.aggregate()
.handle(new GenericHandler() {
public Object handle(Object payload, Map headers) {
System.out.println(payload);
return null;
}
})
.channel("nullChannel")
.get();
}
@Test
public void test() {
String[] strArr = new String[] {"Manoj", "Jeeva"};
Message msg = MessageBuilder.withPayload(strArr)
.build();
splitChnl.send(msg);
}
我明白了,拆分消息后,我要么丰富要么转换。我不应该把它放到频道里。
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitChnl")
.split()
.transform(new HelloTransformer())
.aggregate()
.handle(new ShowOutput<String>())
.channel("nullChannel")
.get();
}
这是正确答案,您可以自己接受。你的问题是你误解了一点 inter-channel
的概念。它们无意将消息发送到单独的流(我们也可以这样做),但它们将这些端点连接到一个流中。它们无论如何都在那里,即使你没有在那里声明 .channel()
。对于另一个流程,我们有 .wireTap()
和 .gateway()
。请阅读关于 <chain>
and DSL Manual 的 Spring 集成参考手册。有足够的信息不会混淆开发阶段...
下面是使用 .gateway 委托给另一个流程的示例。
DefaultAggregatingMessageGroupProcessor 用于将各个消息有效负载聚合到有效负载集合中。
@Bean
IntegrationFlow splitAndDelegate(IntegrationFlow delegateFlow) {
return flowDef -> flowDef.split()
.gateway(delegateFlow)
.aggregate(aggregatorSpec -> aggregatorSpec.outputProcessor(new DefaultAggregatingMessageGroupProcessor()));
}
在下面的示例中,我得到 [Manoj, Jeeva]
作为输出。但是 [Hello Manoj, Hello Jeeva]
是预期的。为什么 serviceChnl 不提供聚合输出?
@Bean
public IntegrationFlow sayHelloIntFlow() {
return IntegrationFlows.from("serviceChnl")
.handle(new GenericHandler<String>() {
public Object handle(String payload, Map<String, Object> headers) {
return "Hello " + payload;
}
})
.get();
}
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitChnl")
.split()
.channel("serviceChnl")
.aggregate()
.handle(new GenericHandler() {
public Object handle(Object payload, Map headers) {
System.out.println(payload);
return null;
}
})
.channel("nullChannel")
.get();
}
@Test
public void test() {
String[] strArr = new String[] {"Manoj", "Jeeva"};
Message msg = MessageBuilder.withPayload(strArr)
.build();
splitChnl.send(msg);
}
我明白了,拆分消息后,我要么丰富要么转换。我不应该把它放到频道里。
@Bean
public IntegrationFlow splitFlow() {
return IntegrationFlows.from("splitChnl")
.split()
.transform(new HelloTransformer())
.aggregate()
.handle(new ShowOutput<String>())
.channel("nullChannel")
.get();
}
这是正确答案,您可以自己接受。你的问题是你误解了一点 inter-channel
的概念。它们无意将消息发送到单独的流(我们也可以这样做),但它们将这些端点连接到一个流中。它们无论如何都在那里,即使你没有在那里声明 .channel()
。对于另一个流程,我们有 .wireTap()
和 .gateway()
。请阅读关于 <chain>
and DSL Manual 的 Spring 集成参考手册。有足够的信息不会混淆开发阶段...
下面是使用 .gateway 委托给另一个流程的示例。 DefaultAggregatingMessageGroupProcessor 用于将各个消息有效负载聚合到有效负载集合中。
@Bean
IntegrationFlow splitAndDelegate(IntegrationFlow delegateFlow) {
return flowDef -> flowDef.split()
.gateway(delegateFlow)
.aggregate(aggregatorSpec -> aggregatorSpec.outputProcessor(new DefaultAggregatingMessageGroupProcessor()));
}