无法从聚合路由丰富交换
Unable to enrich exchange from aggregation route
我无法理解为什么聚合器没有从用于丰富原始交换的子路由接收正确的交换。
我能说的是,日志在路由的末尾正确打印,但在父路由中,我只从子路由的较早部分获得消息正文。
话虽如此,这里是路线的浓缩摘录。
this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
exchange.getIn().setBody(null);
})
.choice()
.when(not(emptyBody))
.split(body())
.streaming()
.setHeader("partner_id", simple("${body[process_id]}"))
.to("sql:select ....")
.setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
.to("log:partner_extracted?showBody=true&showHeaders=true")
.process(appIdCollator)
.process(partnerBuilder)
.aggregate(constant(true), partnersAggregator)
.completionPredicate(new PartnerSizePredicate())
.to("log:partners_collated?showBody=true&showHeaders=true") //The log prints everything correctly
.end();
//In my other route:
...
.enrich("direct:fetchPartnersForAggregation", partnersAggregator).id("partners_added") <========== The 'newExchange' in this partnersAggregator contains the body returned in the first sql:Select in the aggregation route.
我没有意识到拆分器有它自己的聚合策略构建,可以根据 the odcumentation 覆盖它。我只是移动了我的聚合器来覆盖它,现在我有了正确的交换:
this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
exchange.getIn().setBody(null);
})
.choice()
.when(emptyBody)
.stop()
.otherwise()
.split(body(), partnersAggregator)
.streaming()
.setHeader("partner_id", simple("${body[process_id]}"))
.to("sql:select ....")
.setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
.to("log:partner_extracted?showBody=true&showHeaders=true")
.process(appIdCollator)
.process(partnerBuilder)
.to("log:partners_collated?showBody=true&showHeaders=true")
.end();
我无法理解为什么聚合器没有从用于丰富原始交换的子路由接收正确的交换。 我能说的是,日志在路由的末尾正确打印,但在父路由中,我只从子路由的较早部分获得消息正文。 话虽如此,这里是路线的浓缩摘录。
this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
exchange.getIn().setBody(null);
})
.choice()
.when(not(emptyBody))
.split(body())
.streaming()
.setHeader("partner_id", simple("${body[process_id]}"))
.to("sql:select ....")
.setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
.to("log:partner_extracted?showBody=true&showHeaders=true")
.process(appIdCollator)
.process(partnerBuilder)
.aggregate(constant(true), partnersAggregator)
.completionPredicate(new PartnerSizePredicate())
.to("log:partners_collated?showBody=true&showHeaders=true") //The log prints everything correctly
.end();
//In my other route:
...
.enrich("direct:fetchPartnersForAggregation", partnersAggregator).id("partners_added") <========== The 'newExchange' in this partnersAggregator contains the body returned in the first sql:Select in the aggregation route.
我没有意识到拆分器有它自己的聚合策略构建,可以根据 the odcumentation 覆盖它。我只是移动了我的聚合器来覆盖它,现在我有了正确的交换:
this.from("direct:fetchPartnersForAggregation")
.to("sql:select * from ...")
.process(exchange -> {
List<Map<String, Object>> payload = exchange.getIn().getBody(ArrayList.class);
exchange.getIn().setHeader("numOfPartners", payload.size());
if(payload.size() < 1 || (payload.size() == 1 && payload.get(0).get("process_id")== null))
exchange.getIn().setBody(null);
})
.choice()
.when(emptyBody)
.stop()
.otherwise()
.split(body(), partnersAggregator)
.streaming()
.setHeader("partner_id", simple("${body[process_id]}"))
.to("sql:select ....")
.setHeader(Constants.EXCHANGE_HEADERS.OBJECT_TYPE, simple("partners"))
.to("log:partner_extracted?showBody=true&showHeaders=true")
.process(appIdCollator)
.process(partnerBuilder)
.to("log:partners_collated?showBody=true&showHeaders=true")
.end();