如何使用 Apache Camel 正确聚合文件内容?

How do I aggregate file content correctly with Apache Camel?

我正在编写一个工具来解析一些非常大的文件,我正在使用 Camel 实现它。我以前用过 Camel 做其他事情,它对我很有帮助。

我正在做关于在 streaming 模式下处理文件的初始概念验证,因为如果我尝试 运行 一个没有它就太大的文件,我会得到一个 java.lang.OutOfMemoryError.

这是我的路线配置:

@Override
public void configure() throws Exception {
    from("file:" + from)
            .split(body().tokenize("\n")).streaming()
            .bean(new LineProcessor())
            .aggregate(header(Exchange.FILE_NAME_ONLY), new SimpleStringAggregator())
            .completionTimeout(150000)
            .to("file://" + to)
            .end();
}

from指向我的测试文件所在目录

to指向我希望文件处理后所在的目录。

通过这种方法,我可以解析多达数十万行的文件,因此它足以满足我的需要。但我不确定文件是否正确聚合。

如果我 运行 cat /path_to_input/file 我明白了:

Line 1
Line 2
Line 3
Line 4
Line 5

现在在输出目录 cat /path_to_output/file 我明白了:

Line 1
Line 2
Line 3
Line 4
Line 5%

我认为这可能是一件很简单的事情,尽管我不知道如何解决这个问题。两个文件的字节大小也略有不同。

这是我的 LineProcessor class:

public class LineProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        String line = exchange.getIn().getBody(String.class);
        System.out.println(line);
    }

}

还有我的SimpleStringAggregatorclass:

public class SimpleStringAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        if(oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        String body = oldBody + "\n" + newBody;

        oldExchange.getIn().setBody(body);

        return oldExchange;
    }

}

也许我什至不应该为此担心,但我只是想让它完美地工作,因为这只是我开始真正实施之前的 POC。

看起来你的输入文件最后一个字符是换行符。您使用 \n 拆分文件并将其添加回聚合器中,但最后一行除外。因为没有新行,行终止符 \n 从最后一行中删除。一种解决方案可能是提前添加 \n

String body = oldBody + "\n" + newBody + "\n";

0X00me 的回答可能是正确的,但您可能在做不必要的工作。

我假设您使用的是高于 2.3 的 camel 版本。在这种情况下,您可以完全放弃聚合实现,如 according to the camel documentation:

Camel 2.3 及更新版本:

Splitter 将默认 return 原始输入消息。

将你的路线改成这样(我无法测试):

@Override
public void configure() throws Exception {
from("file:" + from)
        .split(body().tokenize("\n")).streaming()
        .bean(new LineProcessor())
        .completionTimeout(150000)
        .to("file://" + to)
        .end();
}

如果您需要进行自定义聚合,则需要实施聚合器。我每天都以这种方式处理文件,并且总是以我开始的方式结束。