Spring 集成 DSL:执行路由收件人后不会执行其他组件
Spring Integration DSL: wont execute other components after executing routeToRecipients
在 运行 申请后,流程在 routeToRecipients 处停止,其他组件未执行。 (请参阅下面代码中的内联注释)
没有发生错误。
但是当我删除 routeToRecipients() 时,其他组件会执行。
我的集成流程有问题吗?
@MessagingGateway
public interface gateway {
@Gateway(requestChannel = "request.input")
void process(List<Msg> test);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from("request.input")
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(this.transformer::transform)
.routeToRecipients(r ->
r.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.recipient("channel4"))
.transform(this.transformer::transform2) // <---this is not executed :(
.handle(new GenericHandler<Msg>() { // <---this too is not executed
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("test service activator!");
return null;
}
})
.get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from("channel1")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 1")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from("channel2")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 2")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow3() {
return IntegrationFlows.from("channel3")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 3")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow4() {
return IntegrationFlows.from("channel4")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 4")
return payload;
}
})
.channel("output")
.get();
}
路由器没有输出通道 - 它只向收件人发送消息。
您可以在路由器之前放置一个 publish/subscribe 通道,其中路由器是一个用户,其余流量是第二个用户。
或者添加频道 5 并使用该频道启动剩余流程(在新流程中)。
在 运行 申请后,流程在 routeToRecipients 处停止,其他组件未执行。 (请参阅下面代码中的内联注释) 没有发生错误。
但是当我删除 routeToRecipients() 时,其他组件会执行。
我的集成流程有问题吗?
@MessagingGateway
public interface gateway {
@Gateway(requestChannel = "request.input")
void process(List<Msg> test);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from("request.input")
.split()
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(this.transformer::transform)
.routeToRecipients(r ->
r.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.recipient("channel4"))
.transform(this.transformer::transform2) // <---this is not executed :(
.handle(new GenericHandler<Msg>() { // <---this too is not executed
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("test service activator!");
return null;
}
})
.get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from("channel1")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 1")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from("channel2")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 2")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow3() {
return IntegrationFlows.from("channel3")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 3")
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow flow4() {
return IntegrationFlows.from("channel4")
.handle(new GenericHandler<Msg>() {
@Override
public Object handle(Msg payload, Map<String, Object> headers) {
System.out.println("Test route Channel 4")
return payload;
}
})
.channel("output")
.get();
}
路由器没有输出通道 - 它只向收件人发送消息。
您可以在路由器之前放置一个 publish/subscribe 通道,其中路由器是一个用户,其余流量是第二个用户。
或者添加频道 5 并使用该频道启动剩余流程(在新流程中)。