在 Spark / PySpark 中加入带有文件名的数据

Join data with filename in Spark / PySpark

我正在从 PySpark 中的多个 S3 文件中读取数据。 S3 密钥包含创建文件的日历日期,我想在数据和该日期之间进行连接。有没有办法在文件和文件名中的数据行之间进行连接?

您可以在包含文件名的数据框中添加一列,稍后合并它们后我用它来识别每一行的来源:

from pyspark.sql.functions import lit

filename = 'myawesomefile.csv'

df_new = df.withColumn('file_name', lit(filename))

这是我最后做的事情:

我覆盖了 LineRecordReader Hadoop class 以便它在每一行中包含文件名,然后覆盖了 TextInputFormat 以使用我的新 LineRecordReader。

然后我使用 newAPIHadoopFile 函数加载了文件。

链接:
LineRecordReader: http://tinyurl.com/linerecordreader
文本输入格式:http://tinyurl.com/textinputformat
新 APIHadoop 文件:http://tinyurl.com/newapihadoopfile