IntegrationFlow + 2 个条件转换器 + 出站网关
IntegrationFlow + 2 conditional transformers + Outbound gateway
我有一个集成流程,它需要 运行 一个转换器或另一个基于某些条件的转换器,然后 post 一个带有出站网关的 http 请求。
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
这是我的尝试,但是我收到此错误消息“没有 output-channel 或 replyChannel header 可用”,我想我理解原因,但不确定如何实现我需要的.
谢谢。
在集成中,使用 router
模式处理条件流:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter
尽管看起来您的问题与条件解决方案完全无关。
我认为您的每个 handle(getOAuth2Handler(...))
returns 都有一些您不作为回复处理的值 sub-flows。如果您对该回复不感兴趣,请考虑在 handle()
.
之后为那些 sub-flow 配置一个 nullChannel
我有一个集成流程,它需要 运行 一个转换器或另一个基于某些条件的转换器,然后 post 一个带有出站网关的 http 请求。
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
这是我的尝试,但是我收到此错误消息“没有 output-channel 或 replyChannel header 可用”,我想我理解原因,但不确定如何实现我需要的.
谢谢。
在集成中,使用 router
模式处理条件流:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter
尽管看起来您的问题与条件解决方案完全无关。
我认为您的每个 handle(getOAuth2Handler(...))
returns 都有一些您不作为回复处理的值 sub-flows。如果您对该回复不感兴趣,请考虑在 handle()
.
nullChannel