在 Spark 中从具有不同 headers 的 CSV 文件形成数据帧
Forming DataFrames from CSV files with different headers in Spark
我正在尝试读取带有变量列表的 Gzipped CSV 文件夹(无扩展名)。例如:
CSV file 1: TIMESTAMP | VAR1 | VAR2 | VAR3
CSV file 2: TIMESTAMP | VAR1 | VAR3
每个文件代表一天。列的顺序可以不同(或者一个文件中可能缺少列)。
使用 spark.read
一次读取整个文件夹的第一个选项已被放弃,因为文件之间的连接考虑的是列顺序而不是列名。
我的下一个选择是按文件阅读:
for (String key : pathArray) {
Dataset<Row> rawData = spark.read().option("header", true).csv(key);
allDatasets.add(rawData);
}
然后对列名进行完全外部联接:
Dataset<Row> data = allDatasets.get(0);
for (int i = 1; i < allDatasets.size(); i++) {
ArrayList<String> columns = new
ArrayList(Arrays.asList(data.columns()));
columns.retainAll(new
ArrayList(Arrays.asList(allDatasets.get(i).columns())));
data = data.join(allDatasets.get(i),
JavaConversions.asScalaBuffer(columns), "outer");
}
但是这个过程非常慢,因为它一次加载一个文件。
下一个方法是使用 sc.binaryFiles
,因为 sc.readFiles
无法解决添加自定义 Hadoop 编解码器的问题(以便能够在没有 gz
扩展名)。
使用最新的方法并将 this code 翻译成 Java 我有以下内容:
- 一个
JavaPairRDD<String, Iterable<Tuple2<String, String>>>
包含变量的名称 (VAR1
) 和一个可迭代的元组 TIMESTAMP,VALUE
VAR
.
我想用它形成一个代表所有文件的 DataFrame,但是我完全不知道如何将这个最终的 PairRDD 转换为 Dataframe。 DataFrame 应该一起表示所有文件的内容。我想要的最终 DataFrame 示例如下:
TIMESTAMP | VAR1 | VAR2 | VAR3
01 32 12 32 ==> Start of contents of file 1
02 10 5 7 ==> End of contents of file 1
03 1 5 ==> Start of contents of file 2
04 4 8 ==> End of contents of file 2
有什么建议或想法吗?
终于拿到了,性能非常好:
- 在 "background" 中按月读取(使用 Java
Executor
使用 CSV 并行读取其他文件夹),使用这种方法 Driver
花费的时间同时扫描每个文件夹减少,因为是并行完成的。
- 接下来,该过程一方面提取 headers,另一方面提取它们的内容(具有变量名、时间戳、值的元组)。
- 最后,使用
RDD
API 合并内容并使用 headers 制作 Dataframe。
我正在尝试读取带有变量列表的 Gzipped CSV 文件夹(无扩展名)。例如:
CSV file 1: TIMESTAMP | VAR1 | VAR2 | VAR3
CSV file 2: TIMESTAMP | VAR1 | VAR3
每个文件代表一天。列的顺序可以不同(或者一个文件中可能缺少列)。
使用 spark.read
一次读取整个文件夹的第一个选项已被放弃,因为文件之间的连接考虑的是列顺序而不是列名。
我的下一个选择是按文件阅读:
for (String key : pathArray) {
Dataset<Row> rawData = spark.read().option("header", true).csv(key);
allDatasets.add(rawData);
}
然后对列名进行完全外部联接:
Dataset<Row> data = allDatasets.get(0);
for (int i = 1; i < allDatasets.size(); i++) {
ArrayList<String> columns = new
ArrayList(Arrays.asList(data.columns()));
columns.retainAll(new
ArrayList(Arrays.asList(allDatasets.get(i).columns())));
data = data.join(allDatasets.get(i),
JavaConversions.asScalaBuffer(columns), "outer");
}
但是这个过程非常慢,因为它一次加载一个文件。
下一个方法是使用 sc.binaryFiles
,因为 sc.readFiles
无法解决添加自定义 Hadoop 编解码器的问题(以便能够在没有 gz
扩展名)。
使用最新的方法并将 this code 翻译成 Java 我有以下内容:
- 一个
JavaPairRDD<String, Iterable<Tuple2<String, String>>>
包含变量的名称 (VAR1
) 和一个可迭代的元组TIMESTAMP,VALUE
VAR
.
我想用它形成一个代表所有文件的 DataFrame,但是我完全不知道如何将这个最终的 PairRDD 转换为 Dataframe。 DataFrame 应该一起表示所有文件的内容。我想要的最终 DataFrame 示例如下:
TIMESTAMP | VAR1 | VAR2 | VAR3
01 32 12 32 ==> Start of contents of file 1
02 10 5 7 ==> End of contents of file 1
03 1 5 ==> Start of contents of file 2
04 4 8 ==> End of contents of file 2
有什么建议或想法吗?
终于拿到了,性能非常好:
- 在 "background" 中按月读取(使用 Java
Executor
使用 CSV 并行读取其他文件夹),使用这种方法Driver
花费的时间同时扫描每个文件夹减少,因为是并行完成的。 - 接下来,该过程一方面提取 headers,另一方面提取它们的内容(具有变量名、时间戳、值的元组)。
- 最后,使用
RDD
API 合并内容并使用 headers 制作 Dataframe。