有什么方法可以捕获在 Spark 中使用通配符读入的多个 parquet 文件的输入文件名?
Is there any way to capture the input file name of multiple parquet files read in with a wildcard in Spark?
我使用标准通配符路径约定,使用 Spark 将多个 parquet 文件读入单个 RDD。换句话说,我正在做这样的事情:
val myRdd = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet")
但是,有时这些 Parquet 文件会有不同的架构。当我在 RDD 上进行转换时,我可以通过查找某些列的存在(或不存在)来尝试在映射函数中区分它们。然而,确定 RDD 中给定行使用哪个模式的可靠方法 - 以及我在这里特别询问的方式 - 是知道我正在查看哪个文件路径。
有没有办法在 RDD 级别上判断当前行来自哪个特定的镶木地板文件?所以想象一下我的代码目前看起来像这样(这是一个简化的例子):
val mapFunction = new MapFunction[Row, (String, Row)] {
override def call(row: Row): (String, Row) = myJob.transform(row)
}
val pairRdd = myRdd.map(mapFunction, encoder=kryo[(String, Row)]
在 myJob.transform( )
代码中,我用其他值修饰结果,将其转换为一对 RDD,并进行一些其他转换。
我使用 row.getAs( ... )
方法来查找特定的列值,这是一个非常有用的方法。我想知道是否有任何类似的方法(例如 row.getInputFile( )
或类似的方法)来获取我当前正在操作的特定文件的名称?
由于我传递通配符以将多个 parquet 文件读取到单个 RDD 中,所以我不知道我正在操作哪个文件。如果不出意外,我喜欢用输入文件名装饰 RDD 行的方法。这可能吗?
您可以为文件名添加一个新列,如下所示
import org.apache.spark.sql.functions._
val myDF = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet").withColumn("inputFile", input_file_name())
我使用标准通配符路径约定,使用 Spark 将多个 parquet 文件读入单个 RDD。换句话说,我正在做这样的事情:
val myRdd = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet")
但是,有时这些 Parquet 文件会有不同的架构。当我在 RDD 上进行转换时,我可以通过查找某些列的存在(或不存在)来尝试在映射函数中区分它们。然而,确定 RDD 中给定行使用哪个模式的可靠方法 - 以及我在这里特别询问的方式 - 是知道我正在查看哪个文件路径。
有没有办法在 RDD 级别上判断当前行来自哪个特定的镶木地板文件?所以想象一下我的代码目前看起来像这样(这是一个简化的例子):
val mapFunction = new MapFunction[Row, (String, Row)] {
override def call(row: Row): (String, Row) = myJob.transform(row)
}
val pairRdd = myRdd.map(mapFunction, encoder=kryo[(String, Row)]
在 myJob.transform( )
代码中,我用其他值修饰结果,将其转换为一对 RDD,并进行一些其他转换。
我使用 row.getAs( ... )
方法来查找特定的列值,这是一个非常有用的方法。我想知道是否有任何类似的方法(例如 row.getInputFile( )
或类似的方法)来获取我当前正在操作的特定文件的名称?
由于我传递通配符以将多个 parquet 文件读取到单个 RDD 中,所以我不知道我正在操作哪个文件。如果不出意外,我喜欢用输入文件名装饰 RDD 行的方法。这可能吗?
您可以为文件名添加一个新列,如下所示
import org.apache.spark.sql.functions._
val myDF = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet").withColumn("inputFile", input_file_name())