如何使用 Apache Camel 正确聚合文件内容?
How do I aggregate file content correctly with Apache Camel?
我正在编写一个工具来解析一些非常大的文件,我正在使用 Camel 实现它。我以前用过 Camel 做其他事情,它对我很有帮助。
我正在做关于在 streaming
模式下处理文件的初始概念验证,因为如果我尝试 运行 一个没有它就太大的文件,我会得到一个 java.lang.OutOfMemoryError
.
这是我的路线配置:
@Override
public void configure() throws Exception {
from("file:" + from)
.split(body().tokenize("\n")).streaming()
.bean(new LineProcessor())
.aggregate(header(Exchange.FILE_NAME_ONLY), new SimpleStringAggregator())
.completionTimeout(150000)
.to("file://" + to)
.end();
}
from
指向我的测试文件所在目录
to
指向我希望文件处理后所在的目录。
通过这种方法,我可以解析多达数十万行的文件,因此它足以满足我的需要。但我不确定文件是否正确聚合。
如果我 运行 cat /path_to_input/file
我明白了:
Line 1
Line 2
Line 3
Line 4
Line 5
现在在输出目录 cat /path_to_output/file
我明白了:
Line 1
Line 2
Line 3
Line 4
Line 5%
我认为这可能是一件很简单的事情,尽管我不知道如何解决这个问题。两个文件的字节大小也略有不同。
这是我的 LineProcessor
class:
public class LineProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
String line = exchange.getIn().getBody(String.class);
System.out.println(line);
}
}
还有我的SimpleStringAggregator
class:
public class SimpleStringAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if(oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
String body = oldBody + "\n" + newBody;
oldExchange.getIn().setBody(body);
return oldExchange;
}
}
也许我什至不应该为此担心,但我只是想让它完美地工作,因为这只是我开始真正实施之前的 POC。
看起来你的输入文件最后一个字符是换行符。您使用 \n
拆分文件并将其添加回聚合器中,但最后一行除外。因为没有新行,行终止符 \n
从最后一行中删除。一种解决方案可能是提前添加 \n
:
String body = oldBody + "\n" + newBody + "\n";
0X00me 的回答可能是正确的,但您可能在做不必要的工作。
我假设您使用的是高于 2.3 的 camel 版本。在这种情况下,您可以完全放弃聚合实现,如 according to the camel documentation:
Camel 2.3 及更新版本:
Splitter 将默认 return 原始输入消息。
将你的路线改成这样(我无法测试):
@Override
public void configure() throws Exception {
from("file:" + from)
.split(body().tokenize("\n")).streaming()
.bean(new LineProcessor())
.completionTimeout(150000)
.to("file://" + to)
.end();
}
如果您需要进行自定义聚合,则需要实施聚合器。我每天都以这种方式处理文件,并且总是以我开始的方式结束。
我正在编写一个工具来解析一些非常大的文件,我正在使用 Camel 实现它。我以前用过 Camel 做其他事情,它对我很有帮助。
我正在做关于在 streaming
模式下处理文件的初始概念验证,因为如果我尝试 运行 一个没有它就太大的文件,我会得到一个 java.lang.OutOfMemoryError
.
这是我的路线配置:
@Override
public void configure() throws Exception {
from("file:" + from)
.split(body().tokenize("\n")).streaming()
.bean(new LineProcessor())
.aggregate(header(Exchange.FILE_NAME_ONLY), new SimpleStringAggregator())
.completionTimeout(150000)
.to("file://" + to)
.end();
}
from
指向我的测试文件所在目录
to
指向我希望文件处理后所在的目录。
通过这种方法,我可以解析多达数十万行的文件,因此它足以满足我的需要。但我不确定文件是否正确聚合。
如果我 运行 cat /path_to_input/file
我明白了:
Line 1
Line 2
Line 3
Line 4
Line 5
现在在输出目录 cat /path_to_output/file
我明白了:
Line 1
Line 2
Line 3
Line 4
Line 5%
我认为这可能是一件很简单的事情,尽管我不知道如何解决这个问题。两个文件的字节大小也略有不同。
这是我的 LineProcessor
class:
public class LineProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
String line = exchange.getIn().getBody(String.class);
System.out.println(line);
}
}
还有我的SimpleStringAggregator
class:
public class SimpleStringAggregator implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if(oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
String body = oldBody + "\n" + newBody;
oldExchange.getIn().setBody(body);
return oldExchange;
}
}
也许我什至不应该为此担心,但我只是想让它完美地工作,因为这只是我开始真正实施之前的 POC。
看起来你的输入文件最后一个字符是换行符。您使用 \n
拆分文件并将其添加回聚合器中,但最后一行除外。因为没有新行,行终止符 \n
从最后一行中删除。一种解决方案可能是提前添加 \n
:
String body = oldBody + "\n" + newBody + "\n";
0X00me 的回答可能是正确的,但您可能在做不必要的工作。
我假设您使用的是高于 2.3 的 camel 版本。在这种情况下,您可以完全放弃聚合实现,如 according to the camel documentation:
Camel 2.3 及更新版本:
Splitter 将默认 return 原始输入消息。
将你的路线改成这样(我无法测试):
@Override
public void configure() throws Exception {
from("file:" + from)
.split(body().tokenize("\n")).streaming()
.bean(new LineProcessor())
.completionTimeout(150000)
.to("file://" + to)
.end();
}
如果您需要进行自定义聚合,则需要实施聚合器。我每天都以这种方式处理文件,并且总是以我开始的方式结束。