有什么方法可以捕获在 Spark 中使用通配符读入的多个 parquet 文件的输入文件名?

Is there any way to capture the input file name of multiple parquet files read in with a wildcard in Spark?

我使用标准通配符路径约定,使用 Spark 将多个 parquet 文件读入单个 RDD。换句话说,我正在做这样的事情:

val myRdd = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet")

但是,有时这些 Parquet 文件会有不同的架构。当我在 RDD 上进行转换时,我可以通过查找某些列的存在(或不存在)来尝试在映射函数中区分它们。然而,确定 RDD 中给定行使用哪个模式的可靠方法 - 以及我在这里特别询问的方式 - 是知道我正在查看哪个文件路径。

有没有办法在 RDD 级别上判断当前行来自哪个特定的镶木地板文件?所以想象一下我的代码目前看起来像这样(这是一个简化的例子):

val mapFunction = new MapFunction[Row, (String, Row)] {
  override def call(row: Row): (String, Row) = myJob.transform(row)
}

val pairRdd = myRdd.map(mapFunction, encoder=kryo[(String, Row)]

myJob.transform( ) 代码中,我用其他值修饰结果,将其转换为一对 RDD,并进行一些其他转换。

我使用 row.getAs( ... ) 方法来查找特定的列值,这是一个非常有用的方法。我想知道是否有任何类似的方法(例如 row.getInputFile( ) 或类似的方法)来获取我当前正在操作的特定文件的名称?

由于我传递通配符以将多个 parquet 文件读取到单个 RDD 中,所以我不知道我正在操作哪个文件。如果不出意外,我喜欢用输入文件名装饰 RDD 行的方法。这可能吗?

您可以为文件名添加一个新列,如下所示

import org.apache.spark.sql.functions._
val myDF = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet").withColumn("inputFile", input_file_name())