在 Spark 中读取多个文件并在追加之前对其进行处理
Reading Multiple Files in Spark and Processing it before Appending
我的问题是我有多个结构相同的 txt 文件,我想将它们导入 spark。然后创建标识符列,汇总数据,最后堆叠。
例如其中一个文件如下所示:
Date A B C
2/21/2014 A1 11 2
2/22/2014 A1 11 5
2/23/2014 A1 21 3
2/24/2014 A1 13 5
2/25/2014 A1 23 4
2/26/2014 A1 28 4
2/27/2014 A1 32 2
2/28/2014 B1 45 4
3/1/2014 B1 39 4
3/2/2014 B1 29 4
3/3/2014 B1 49 5
3/4/2014 B1 18 4
3/5/2014 B1 30 3
3/6/2014 B1 50 5
读完这个文件后,我想添加一个提及文件名的列,更新后的数据如下所示:
Date A B C File
2/21/2014 A1 22 2 File1
2/22/2014 A1 36 2 File1
2/23/2014 A1 17 4 File1
2/24/2014 A1 30 2 File1
2/25/2014 A1 11 2 File1
2/26/2014 A1 32 2 File1
2/27/2014 A1 19 5 File1
2/28/2014 B1 22 3 File1
3/1/2014 B1 12 5 File1
3/2/2014 B1 50 3 File1
3/3/2014 B1 42 4 File1
3/4/2014 B1 37 4 File1
3/5/2014 B1 31 5 File1
3/6/2014 B1 20 3 File1
然后总结数据:
File A B C
File1 A1 167 19
File1 B1 214 27
同样,将创建并汇总另一个数据集。最后叠在一起。在 2 个文件的情况下,数据集如下所示:
File A B C
File1 A1 167 19
File1 B1 214 27
File2 Z10 167 19
File2 X20 214 27
我可以单独导入数据,通过将其转换为数据帧来处理它们,然后最后堆叠它们。但我无法以自动化的方式做到这一点。谁能帮帮我。
非常感谢!
如果您的单个文件适合内存,您可以使用 wholeTextFiles
、
rdd = sc.wholeTextFiles("/directorypath/*")
def appender(x):
i = x[0]
j = x[1].split("\n")
k = [x.split() for x in j]
l = [x.append(i) for x in k]
return k
frdd = rdd.flatMap(appender)
df = frdd.toDF("Date","A","B","C","FileName")
wholeTextFiles
returns 元组(文件名,文件内容)从那里你可以附加文件名。
df.groupBy("FileName","A").count() ##sum()
我的问题是我有多个结构相同的 txt 文件,我想将它们导入 spark。然后创建标识符列,汇总数据,最后堆叠。
例如其中一个文件如下所示:
Date A B C
2/21/2014 A1 11 2
2/22/2014 A1 11 5
2/23/2014 A1 21 3
2/24/2014 A1 13 5
2/25/2014 A1 23 4
2/26/2014 A1 28 4
2/27/2014 A1 32 2
2/28/2014 B1 45 4
3/1/2014 B1 39 4
3/2/2014 B1 29 4
3/3/2014 B1 49 5
3/4/2014 B1 18 4
3/5/2014 B1 30 3
3/6/2014 B1 50 5
读完这个文件后,我想添加一个提及文件名的列,更新后的数据如下所示:
Date A B C File
2/21/2014 A1 22 2 File1
2/22/2014 A1 36 2 File1
2/23/2014 A1 17 4 File1
2/24/2014 A1 30 2 File1
2/25/2014 A1 11 2 File1
2/26/2014 A1 32 2 File1
2/27/2014 A1 19 5 File1
2/28/2014 B1 22 3 File1
3/1/2014 B1 12 5 File1
3/2/2014 B1 50 3 File1
3/3/2014 B1 42 4 File1
3/4/2014 B1 37 4 File1
3/5/2014 B1 31 5 File1
3/6/2014 B1 20 3 File1
然后总结数据:
File A B C
File1 A1 167 19
File1 B1 214 27
同样,将创建并汇总另一个数据集。最后叠在一起。在 2 个文件的情况下,数据集如下所示:
File A B C
File1 A1 167 19
File1 B1 214 27
File2 Z10 167 19
File2 X20 214 27
我可以单独导入数据,通过将其转换为数据帧来处理它们,然后最后堆叠它们。但我无法以自动化的方式做到这一点。谁能帮帮我。
非常感谢!
如果您的单个文件适合内存,您可以使用 wholeTextFiles
、
rdd = sc.wholeTextFiles("/directorypath/*")
def appender(x):
i = x[0]
j = x[1].split("\n")
k = [x.split() for x in j]
l = [x.append(i) for x in k]
return k
frdd = rdd.flatMap(appender)
df = frdd.toDF("Date","A","B","C","FileName")
wholeTextFiles
returns 元组(文件名,文件内容)从那里你可以附加文件名。
df.groupBy("FileName","A").count() ##sum()