为什么用一个很长的单行作为输入的处理文件会给出不同数量的记录?
Why does processing file with one very long single line as input give different numbers of records?
我使用 Spark 1.2.1(在 local
模式下)从文件中提取和处理日志信息。
文件大小可能超过 100Mb。该文件包含一个很长的单行,所以我使用正则表达式将该文件拆分为日志数据行。
MyApp.java
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> txtFileRdd = sc.textFile(filename);
JavaRDD<MyLog> logRDD = txtFileRdd.flatMap(LogParser::parseFromLogLine).cache();
LogParser.java
public static Iterable<MyLog> parseFromLogLine(String logline) {
List<MyLog> logs = new LinkedList<MyLog>();
Matcher m = PATTERN.matcher(logline);
while (m.find()) {
logs.add(new MyLog(m.group(0)));
}
System.out.println("Logs detected " + logs.size());
return logs;
}
已处理文件的实际大小约为 100 Mb,实际上包含 323863
个日志项。
当我使用 Spark 从文件中提取我的日志项时,我得到 455651
[logRDD.count()
] 个不正确的日志项。
我认为这是由于文件分区引起的,检查输出我看到以下内容:
Logs detected 18694
Logs detected 113104
Logs detected 323863
总和是455651
!
所以我看到我的分区相互合并并保留重复项,我想防止这种行为。
解决方法是使用 repartition(1)
,如下所示:
txtFileRdd.repartition(1).flatMap(LogParser::parseFromLogLine).cache();
这确实给了我想要的结果 323863
,但我怀疑它对性能有好处。
如何更好地处理性能?
分区默认是基于行的。看起来,当只有一条很长的线时,这会以一种有趣的方式失败。您可以考虑为此提交一个错误(也许已经有一个)。
拆分由 Hadoop 文件 API 执行,特别是 TextInputFormat
class。一种选择是指定您自己的 InputFormat
(可能包括您的整个解析器)并使用 sc.hadoopFile
.
另一种选择是通过 textinputformat.record.delimiter
:
设置不同的分隔符
// Use space instead of newline as the delimiter.
sc.hadoopConfiguration.set("textinputformat.record.delimiter", " ")
我使用 Spark 1.2.1(在 local
模式下)从文件中提取和处理日志信息。
文件大小可能超过 100Mb。该文件包含一个很长的单行,所以我使用正则表达式将该文件拆分为日志数据行。
MyApp.java
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> txtFileRdd = sc.textFile(filename);
JavaRDD<MyLog> logRDD = txtFileRdd.flatMap(LogParser::parseFromLogLine).cache();
LogParser.java
public static Iterable<MyLog> parseFromLogLine(String logline) {
List<MyLog> logs = new LinkedList<MyLog>();
Matcher m = PATTERN.matcher(logline);
while (m.find()) {
logs.add(new MyLog(m.group(0)));
}
System.out.println("Logs detected " + logs.size());
return logs;
}
已处理文件的实际大小约为 100 Mb,实际上包含 323863
个日志项。
当我使用 Spark 从文件中提取我的日志项时,我得到 455651
[logRDD.count()
] 个不正确的日志项。
我认为这是由于文件分区引起的,检查输出我看到以下内容:
Logs detected 18694
Logs detected 113104
Logs detected 323863
总和是455651
!
所以我看到我的分区相互合并并保留重复项,我想防止这种行为。
解决方法是使用 repartition(1)
,如下所示:
txtFileRdd.repartition(1).flatMap(LogParser::parseFromLogLine).cache();
这确实给了我想要的结果 323863
,但我怀疑它对性能有好处。
如何更好地处理性能?
分区默认是基于行的。看起来,当只有一条很长的线时,这会以一种有趣的方式失败。您可以考虑为此提交一个错误(也许已经有一个)。
拆分由 Hadoop 文件 API 执行,特别是 TextInputFormat
class。一种选择是指定您自己的 InputFormat
(可能包括您的整个解析器)并使用 sc.hadoopFile
.
另一种选择是通过 textinputformat.record.delimiter
:
// Use space instead of newline as the delimiter.
sc.hadoopConfiguration.set("textinputformat.record.delimiter", " ")