为骆驼路线中未知数量的拆分元素设置完成大小或谓词
Setting a completion size or predicate for unknown number of splited elements in Camel route
我有一个使用 http 服务的骆驼路线,其中 return 一个 json 具有几个我需要通过 Id 关联的元素。我不知道响应中有多少具有相同 ID 的元素,所以,如何在聚合中设置完成以便关联所有这些元素?
这些是我的路线:
from("direct:getInfo")
.id("getInfo")
.setHeader("accept", constant("application/json"))
.setHeader("authorization", constant("xyz"))
.setHeader("Cache-Control", constant("no-cache"))
.setHeader("content-Type", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeader(Exchange.HTTP_PATH)
.removeHeader("CamelHttp*")
.setBody(simple("${null}"))
.streamCaching()
.to("http4:someURL") //responses an array of n json elements
.split().jsonpath("$").streaming()
.to("direct:splitInfo");
from("direct:splitInfo")
.id("splitInfo")
.aggregate(jsonpath("CustomerId"), new ArrayListAggregationStrategy())
.completionSize(???) //How must I set the completion in order to correlate all items
.to("direct:process");
非常感谢。
根据评论中的示例完全重写答案
由于您想要拆分并重新聚合完整的 JSON 有效载荷,因此您只需要 Splitter EIP with an aggregation strategy。
如果您为拆分器提供拆分负载的表达式以及聚合策略,则根本不需要完成条件。每个 JSON 负载都被处理为 "batch".
.from(endpoint)
// body = complete JSON payload
.split([split-expression], new MyAggregationStrategy())
// each element is sent to this bean
.to("bean:elementProcessorBean")
// you must end the splitter
.end()
// here you get the complete re-aggregated JSON payload
// how it is re-aggregated is up to MyAggregationStrategy
查看链接的 Splitter 文档以获取示例。
我有一个使用 http 服务的骆驼路线,其中 return 一个 json 具有几个我需要通过 Id 关联的元素。我不知道响应中有多少具有相同 ID 的元素,所以,如何在聚合中设置完成以便关联所有这些元素?
这些是我的路线:
from("direct:getInfo")
.id("getInfo")
.setHeader("accept", constant("application/json"))
.setHeader("authorization", constant("xyz"))
.setHeader("Cache-Control", constant("no-cache"))
.setHeader("content-Type", constant("application/json"))
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
.removeHeader(Exchange.HTTP_PATH)
.removeHeader("CamelHttp*")
.setBody(simple("${null}"))
.streamCaching()
.to("http4:someURL") //responses an array of n json elements
.split().jsonpath("$").streaming()
.to("direct:splitInfo");
from("direct:splitInfo")
.id("splitInfo")
.aggregate(jsonpath("CustomerId"), new ArrayListAggregationStrategy())
.completionSize(???) //How must I set the completion in order to correlate all items
.to("direct:process");
非常感谢。
根据评论中的示例完全重写答案
由于您想要拆分并重新聚合完整的 JSON 有效载荷,因此您只需要 Splitter EIP with an aggregation strategy。
如果您为拆分器提供拆分负载的表达式以及聚合策略,则根本不需要完成条件。每个 JSON 负载都被处理为 "batch".
.from(endpoint)
// body = complete JSON payload
.split([split-expression], new MyAggregationStrategy())
// each element is sent to this bean
.to("bean:elementProcessorBean")
// you must end the splitter
.end()
// here you get the complete re-aggregated JSON payload
// how it is re-aggregated is up to MyAggregationStrategy
查看链接的 Splitter 文档以获取示例。