如何更正 "spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'" 异常?
How do I correct the "spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'" exception?
我已经查看了 Spring Integration and DSL upgrade - one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel' Error ,在我看来我的解决方案的形状在这里,但解决方案的表达方式对我来说没有意义。
更容易理解,但我没有看到处理程序方法(无论它是什么)在哪里返回 void 给我这个问题。
正在为 .aggregate() 工厂方法抛出异常 spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
。队列是在运行时根据应用程序收集的元数据动态构建的。
根据对框架的调试,.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
子句似乎返回了 void。将 .defaultOutputToParentFlow()
添加到 recipientListRouter 会消除异常,但可能不是正确的解决方案,因为当我进行此调整时流程实际上并未开始。
我欢迎任何建议。
代码片段:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
.headerExpression("yyy", "payload[0].get(\"yyy\")")
)
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
.headerExpression("bbb", "payload[0].get(\"bbb\")")
)
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
.aggregate()
.handle(cleanupAdapter).get();
return flow;
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules) {
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
return recipientListSpec;
}
编辑:根据以下讨论修改解决方案代码:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
.headerExpression("yyy", "payload[0].get(\"yyy\")")
)
.gateway(new DirectChannel())
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
.headerExpression("bbb", "payload[0].get(\"bbb\")")
)
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules).defaultOutputToParentFlow())
.aggregate()
.handle(cleanupAdapter).get();
您无法在 routeToRecipients()
之后继续流程。我的意思是你的 .aggregate()
导致了这个错误,因为 RecipientListRouter
不是 AbstractReplyProducingMessageHandler
,而是普通的 AbstractMessageHandler
并且它知道在路由功能后仅根据为此路由器类型提供的映射。
有关详细信息,请参阅 defaultOutputToParentFlow()
JavaDocs:
/**
* Make a default output mapping of the router to the parent flow.
* Use the next, after router, parent flow {@link MessageChannel} as a
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
* @return the router spec.
*/
public S defaultOutputToParentFlow() {
因此,只有当它与收件人映射不匹配时,它才会继续您的流程。
ignoreSendFailures(true)
也必须设置,以使其不会因错误而失败,而是回退到此默认输出。
我已经查看了 Spring Integration and DSL upgrade - one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel' Error ,在我看来我的解决方案的形状在这里,但解决方案的表达方式对我来说没有意义。
正在为 .aggregate() 工厂方法抛出异常 spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
。队列是在运行时根据应用程序收集的元数据动态构建的。
根据对框架的调试,.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
子句似乎返回了 void。将 .defaultOutputToParentFlow()
添加到 recipientListRouter 会消除异常,但可能不是正确的解决方案,因为当我进行此调整时流程实际上并未开始。
我欢迎任何建议。
代码片段:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
.headerExpression("yyy", "payload[0].get(\"yyy\")")
)
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
.headerExpression("bbb", "payload[0].get(\"bbb\")")
)
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
.aggregate()
.handle(cleanupAdapter).get();
return flow;
private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
Collection<RuleMetadata> rules) {
rules.forEach(
rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));
return recipientListSpec;
}
编辑:根据以下讨论修改解决方案代码:
StandardIntegrationFlow flow = IntegrationFlows
.from(setupAdapter,
c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
.enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
.headerExpression("yyy", "payload[0].get(\"yyy\")")
)
.gateway(new DirectChannel())
.split(tableSplitter)
.enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
.headerExpression("bbb", "payload[0].get(\"bbb\")")
)
.channel(c -> c.executor(stepTaskExecutor))
.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules).defaultOutputToParentFlow())
.aggregate()
.handle(cleanupAdapter).get();
您无法在 routeToRecipients()
之后继续流程。我的意思是你的 .aggregate()
导致了这个错误,因为 RecipientListRouter
不是 AbstractReplyProducingMessageHandler
,而是普通的 AbstractMessageHandler
并且它知道在路由功能后仅根据为此路由器类型提供的映射。
有关详细信息,请参阅 defaultOutputToParentFlow()
JavaDocs:
/**
* Make a default output mapping of the router to the parent flow.
* Use the next, after router, parent flow {@link MessageChannel} as a
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
* @return the router spec.
*/
public S defaultOutputToParentFlow() {
因此,只有当它与收件人映射不匹配时,它才会继续您的流程。
ignoreSendFailures(true)
也必须设置,以使其不会因错误而失败,而是回退到此默认输出。