如何更正 "spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'" 异常?

How do I correct the "spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'" exception?

我已经查看了 Spring Integration and DSL upgrade - one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel' Error ,在我看来我的解决方案的形状在这里,但解决方案的表达方式对我来说没有意义。

更容易理解,但我没有看到处理程序方法(无论它是什么)在哪里返回 void 给我这个问题。

正在为 .aggregate() 工厂方法抛出异常 spring integration is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'。队列是在运行时根据应用程序收集的元数据动态构建的。

根据对框架的调试,.routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules)) 子句似乎返回了 void。将 .defaultOutputToParentFlow() 添加到 recipientListRouter 会消除异常,但可能不是正确的解决方案,因为当我进行此调整时流程实际上并未开始。

我欢迎任何建议。

代码片段:

        StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                    .headerExpression("yyy", "payload[0].get(\"yyy\")")
                    )
            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
                    .headerExpression("bbb", "payload[0].get(\"bbb\")")
                    )
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules))
            .aggregate()
            .handle(cleanupAdapter).get();

    return flow;



private RecipientListRouterSpec buildRecipientListRouterSpecForRules(RecipientListRouterSpec recipientListSpec,
        Collection<RuleMetadata> rules) {
    rules.forEach(
            rule -> recipientListSpec.recipientFlow(getFilterExpression(rule), f -> createFlowDefForRule(f, rule)));

    return recipientListSpec;
}

编辑:根据以下讨论修改解决方案代码:

        StandardIntegrationFlow flow = IntegrationFlows
            .from(setupAdapter,
                    c -> c.poller(Pollers.fixedRate(1000L, TimeUnit.MILLISECONDS).maxMessagesPerPoll(1)))
            .enrichHeaders(h -> h.headerExpression("xxx", "payload[0].get(\"xxx\")")
                    .headerExpression("yyy", "payload[0].get(\"yyy\")")
                    )
            .gateway(new DirectChannel())
            .split(tableSplitter)
            .enrichHeaders(h -> h.headerExpression("aaa", "payload[0].get(\"aaa\")")
                    .headerExpression("bbb", "payload[0].get(\"bbb\")")
                    )
            .channel(c -> c.executor(stepTaskExecutor))
            .routeToRecipients(r -> this.buildRecipientListRouterSpecForRules(r, rules).defaultOutputToParentFlow())
            .aggregate()
            .handle(cleanupAdapter).get();

您无法在 routeToRecipients() 之后继续流程。我的意思是你的 .aggregate() 导致了这个错误,因为 RecipientListRouter 不是 AbstractReplyProducingMessageHandler,而是普通的 AbstractMessageHandler 并且它知道在路由功能后仅根据为此路由器类型提供的映射。

有关详细信息,请参阅 defaultOutputToParentFlow() JavaDocs:

/**
 * Make a default output mapping of the router to the parent flow.
 * Use the next, after router, parent flow {@link MessageChannel} as a
 * {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
 * @return the router spec.
 */
public S defaultOutputToParentFlow() {

因此,只有当它与收件人映射不匹配时,它才会继续您的流程。 ignoreSendFailures(true) 也必须设置,以使其不会因错误而失败,而是回退到此默认输出。