IntegrationFlow + 2 个条件转换器 + 出站网关

IntegrationFlow + 2 conditional transformers + Outbound gateway

我有一个集成流程,它需要 运行 一个转换器或另一个基于某些条件的转换器,然后 post 一个带有出站网关的 http 请求。

@Bean
public IntegrationFlow messageFromKafka() {
    return flow -> flow
            .publishSubscribeChannel(s -> s
                    .subscribe(f1 -> f1
                            .<AttachmentEvent>filter(validator::isCondition1)
                            .transform(transformer1)
                    )
                    .subscribe(fl -> fl
                            .<AttachmentEvent>filter(validator::isCondition2)
                            .transform(transformer2)
                            .split()
                    )
            )
            .publishSubscribeChannel(s -> s
                    .subscribe(fl1 -> fl1
                            .transform(httpTransformer)
                            .<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
                                    .subFlowMapping("operation1", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
                                    )
                                    .subFlowMapping("operation2", sf -> sf
                                            .<String>filter(message -> isVendorStatusDescNotCancelled(message))
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
                                    )
                                    .subFlowMapping("operation3", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
                                    )
                            )
                    )
                    .subscribe(fl2 -> fl2
                            .handle(getKafkaHandler())
                    )
            );
}  

这是我的尝试,但是我收到此错误消息“没有 output-channel 或 replyChannel header 可用”,我想我理解原因,但不确定如何实现我需要的.

谢谢。

在集成中,使用 router 模式处理条件流:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter

尽管看起来您的问题与条件解决方案完全无关。

我认为您的每个 handle(getOAuth2Handler(...)) returns 都有一些您不作为回复处理的值 sub-flows。如果您对该回复不感兴趣,请考虑在 handle().

之后为那些 sub-flow 配置一个 nullChannel