如何在 subFlowMapping 中正确控制流量

How can I control the flow correctly in the subFlowMapping

这是基本问题,但无处可问。 (我上周四刚刚开始 spring-整合。) 我必须使用 subFlowMapping。单个 XML 文件可以包含多个域数据。文件名基础过滤只告诉我 XML 文件中包含什么样的数据。

这是工作版本。 2 个持久通道执行良好并保存数据。

channel("updatingPartsInventoryFlow.input")

channel("updatingTransactionCompleteLogFlow.input")

.subFlowMapping(PayloadType.STOCK, sf -> sf
                    .transform(unmarshallingTransformer)
                    .<Stock, List<PartsInventory>>transform(s -> createPartsInventories(s))
                    .publishSubscribeChannel(c1 -> c1
                        .subscribe(s1 -> s1
                            .channel("updatingPartsInventoryFlow.input")))
                            .publishSubscribeChannel(c2 -> c2
                        .subscribe(s2 -> s2
                            .handle((payload, headers) -> {
                                return createPartsInventoryTransactionLog((List<PartsInventory>) payload,                                                   (String) headers.get("StartTime"));
                            })
                            .channel("updatingTransactionCompleteLogFlow.input"))))

这些效果不佳。

channel("updatingPartsInventoryFlow.input") 数据总是不保存

channel("updatingTransactionCompleteLogFlow.input")始终保存日志

.subFlowMapping(PayloadType.STOCK, sf -> sf
                    .transform(unmarshallingTransformer)
                    .<Stock, List<PartsInventory>>transform(s -> createPartsInventories(s))
                    .channel("updatingPartsInventoryFlow.input")
                    .handle((payload, headers) -> {
                        return createPartsInventoryTransactionLog((List<PartsInventory>) payload, 
                                (String) headers.get("StartTime"));
                    })
                    .channel("updatingTransactionCompleteLogFlow.input"))

.subFlowMapping(PayloadType.STOCK, sf -> sf
                    .publishSubscribeChannel(c1 -> c1
                    .subscribe(s1 -> s1
                        .transform(unmarshallingTransformer)
                        .<Stock, List<PartsInventory>>transform(s -> createPartsInventories(s))
                        .channel("updatingPartsInventoryFlow.input")))
                        .handle((payload, headers) -> {
                            return createPartsInventoryTransactionLog((List<PartsInventory>) payload, 
                                            (String) headers.get("StartTime"));
                        })
                        .channel("updatingTransactionCompleteLogFlow.input"))))

实际上,我的意图不是工作版本。数据应该在日志之前保存。 工作版本似乎数据和日志同时保存。 请告诉我。

提前致谢。

同一键不能有多个映射条目。当你告诉 .subFlowMapping(PayloadType.STOCK 时,这意味着一些内部通道被映射到这种类型到 HashMap 中。这就是同一键的第二个映射总是获胜的原因。因此,您的第一个解决方案是正确的方法:预期类型的​​一个映射和 PublishSubscribeChannel 用于将相同消息分发到不同的子流。