在 Spark 中从具有不同 headers 的 CSV 文件形成数据帧

Forming DataFrames from CSV files with different headers in Spark

我正在尝试读取带有变量列表的 Gzipped CSV 文件夹(无扩展名)。例如:

CSV file 1: TIMESTAMP | VAR1 | VAR2 | VAR3

CSV file 2: TIMESTAMP | VAR1 | VAR3

每个文件代表一天。列的顺序可以不同(或者一个文件中可能缺少列)。

使用 spark.read 一次读取整个文件夹的第一个选项已被放弃,因为文件之间的连接考虑的是列顺序而不是列名。 我的下一个选择是按文件阅读:

 for (String key : pathArray) {
       Dataset<Row> rawData = spark.read().option("header", true).csv(key);
       allDatasets.add(rawData);
    }

然后对列名进行完全外部联接:

Dataset<Row> data = allDatasets.get(0);
     for (int i = 1; i < allDatasets.size(); i++) {
        ArrayList<String> columns = new 
        ArrayList(Arrays.asList(data.columns()));
        columns.retainAll(new  
        ArrayList(Arrays.asList(allDatasets.get(i).columns())));
        data = data.join(allDatasets.get(i), 
        JavaConversions.asScalaBuffer(columns), "outer");
      }

但是这个过程非常慢,因为它一次加载一个文件。

下一个方法是使用 sc.binaryFiles,因为 sc.readFiles 无法解决添加自定义 Hadoop 编解码器的问题(以便能够在没有 gz 扩展名)。

使用最新的方法并将 this code 翻译成 Java 我有以下内容:

我想用它形成一个代表所有文件的 DataFrame,但是我完全不知道如何将这个最终的 PairRDD 转换为 Dataframe。 DataFrame 应该一起表示所有文件的内容。我想要的最终 DataFrame 示例如下:

  TIMESTAMP | VAR1 | VAR2 | VAR3 
   01           32      12    32  ==> Start of contents of file 1
   02           10       5     7  ==> End of contents of file 1
   03                    1     5  ==> Start of contents of file 2
   04                    4     8  ==> End of contents of file 2

有什么建议或想法吗?

终于拿到了,性能非常好:

  1. 在 "background" 中按月读取(使用 Java Executor 使用 CSV 并行读取其他文件夹),使用这种方法 Driver 花费的时间同时扫描每个文件夹减少,因为是并行完成的。
  2. 接下来,该过程一方面提取 headers,另一方面提取它们的内容(具有变量名、时间戳、值的元组)。
  3. 最后,使用 RDD API 合并内容并使用 headers 制作 Dataframe。