为什么我的 apache camel split/aggregate 路由 return 没有结果?

Why does my apache camel split/aggregate route return no results?

我正在尝试读取二进制文件,将其转换为 pojo 格式,然后输出为 CSV。解组(和编组)似乎没问题,但我在优化将相关记录转换为 Foo.class 时遇到了问题。下面的尝试returns没有结果。

from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
    .unmarshal(unmarshaller)
    .split(bodyAs(Iterator.class), new ListAggregationStrategy())
        .choice()
            .when(not(predicate)).stop()
            .otherwise().convertBodyTo(Foo.class)
        .end()
    .end()
    .marshal(csv)
    .to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));

我能够让它像这样工作,但感觉必须有更好的方法 - 这将需要高效,而 1 秒的超时感觉与此相反,这是为什么我要尝试使用内置的 split 聚合。或者使用 completionFromBatchConsumer 的某种方式,但我也很难做到这一点!

from(String.format("file://%s?move=%s", INPUT_DIRECTORY, MOVE_DIRECTORY))
    .unmarshal(unmarshaller)
    .split(bodyAs(Iterator.class))
        .streaming()
        .filter(predicate)
            .convertBodyTo(Foo.class)
            .aggregate(header("CamelFileName"), new ListAggregationStrategy())
            .completionTimeout(1000)
            .marshal(csv)
            .to(String.format("file://%s?fileName=${header.CamelFileName}.csv", OUTPUT_DIRECTORY));

您可以在第一个解决方案中创建自己的 AggregationStrategy。 不要在您的选择语句中调用 stop(),而是将一个简单的 header(如“skipMerge”)设置为 true。 在您的策略中,测试此 header 是否存在,如果存在,则跳过它。

class ArrayListAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Object newBody = newExchange.getIn().getBody();
        Boolean skipMerge = newExchange.getIn().getHeader("skipMerge", Boolean.class);

        if (!skipMerge) { return oldExchange; }
        ArrayList<Object> list = null;
        if (oldExchange == null) {
            list = new ArrayList<Object>();
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {
            list = oldExchange.getIn().getBody(ArrayList.class);
            list.add(newBody);
            return oldExchange;
        }
    }
} 

目前,您的代码永远不会进入 marshal(csv),因为聚合器不会收到所有拆分的部分。