为什么我的 apache camel split/aggregate 路由 return 没有结果?
Why does my apache camel split/aggregate route return no results?
我正在尝试读取二进制文件,将其转换为 pojo 格式,然后输出为 CSV。解组(和编组)似乎没问题,但我在优化将相关记录转换为 Foo.class
时遇到了问题。下面的尝试returns没有结果。
from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
.unmarshal(unmarshaller)
.split(bodyAs(Iterator.class), new ListAggregationStrategy())
.choice()
.when(not(predicate)).stop()
.otherwise().convertBodyTo(Foo.class)
.end()
.end()
.marshal(csv)
.to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));
我能够让它像这样工作,但感觉必须有更好的方法 - 这将需要高效,而 1 秒的超时感觉与此相反,这是为什么我要尝试使用内置的 split
聚合。或者使用 completionFromBatchConsumer
的某种方式,但我也很难做到这一点!
from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
.unmarshal(unmarshaller)
.split(bodyAs(Iterator.class))
.streaming()
.filter(predicate)
.convertBodyTo(Foo.class)
.aggregate(header("CamelFileName"), new ListAggregationStrategy())
.completionTimeout(1000)
.marshal(csv)
.to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));
您可以在第一个解决方案中创建自己的 AggregationStrategy。
不要在您的选择语句中调用 stop(),而是将一个简单的 header(如“skipMerge”)设置为 true。
在您的策略中,测试此 header 是否存在,如果存在,则跳过它。
class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
Boolean skipMerge = newExchange.getIn().getHeader("skipMerge", Boolean.class);
if (!skipMerge) { return oldExchange; }
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}
目前,您的代码永远不会进入 marshal(csv),因为聚合器不会收到所有拆分的部分。
我正在尝试读取二进制文件,将其转换为 pojo 格式,然后输出为 CSV。解组(和编组)似乎没问题,但我在优化将相关记录转换为 Foo.class
时遇到了问题。下面的尝试returns没有结果。
from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
.unmarshal(unmarshaller)
.split(bodyAs(Iterator.class), new ListAggregationStrategy())
.choice()
.when(not(predicate)).stop()
.otherwise().convertBodyTo(Foo.class)
.end()
.end()
.marshal(csv)
.to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));
我能够让它像这样工作,但感觉必须有更好的方法 - 这将需要高效,而 1 秒的超时感觉与此相反,这是为什么我要尝试使用内置的 split
聚合。或者使用 completionFromBatchConsumer
的某种方式,但我也很难做到这一点!
from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
.unmarshal(unmarshaller)
.split(bodyAs(Iterator.class))
.streaming()
.filter(predicate)
.convertBodyTo(Foo.class)
.aggregate(header("CamelFileName"), new ListAggregationStrategy())
.completionTimeout(1000)
.marshal(csv)
.to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));
您可以在第一个解决方案中创建自己的 AggregationStrategy。 不要在您的选择语句中调用 stop(),而是将一个简单的 header(如“skipMerge”)设置为 true。 在您的策略中,测试此 header 是否存在,如果存在,则跳过它。
class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
Boolean skipMerge = newExchange.getIn().getHeader("skipMerge", Boolean.class);
if (!skipMerge) { return oldExchange; }
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}
目前,您的代码永远不会进入 marshal(csv),因为聚合器不会收到所有拆分的部分。