使用 Spark RDD 保存和加载 wholeTextFiles
Saving and Loading wholeTextFiles using Spark RDD
我需要在spark中对一些文本文件进行批处理。基本上有人给了我大量畸形的 csv 文件。它们包含多行 header 任意文本格式的数据,然后是多行格式正确的 csv 数据。我需要将这些数据分成两个文件,或者至少以某种方式摆脱 header。
无论如何,我读到过您可以获得格式如下的 RDD:
[(filename, content)]
通过使用
spark \
.sparkContext \
.wholeTextFiles(input_files_csv)
然后我想在此 RDD 上执行映射操作,这会产生另一种与原始格式完全相同的格式
[(newfilename, content)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的写入命令。我可以将 RDD 保存为原始文件,但我无法将其保存为普通文件,之后我可以将其作为数据帧读取。
我想我可以删除 headers,然后另存为一个巨大的 csv,并将文件名作为一个新列,但我觉得那样不会那么有效。
有人能解决我的问题吗?
这是 Scala,但在 Python 中应该不会太远。在 "foreach" 中,我没有使用任何特定于 spark 的东西来编写文件,只是使用常规的 Hadoop API。
sc.wholeTextFiles("/tmp/test-data/")
.foreach{ x =>
val filename = x._1
val content = x._2
val fs = FileSystem.get(new Configuration())
val output = fs.create(new Path(s"${filename}-copy"))
val writer = new PrintWriter(output)
writer.write(content)
writer.close
}
我需要在spark中对一些文本文件进行批处理。基本上有人给了我大量畸形的 csv 文件。它们包含多行 header 任意文本格式的数据,然后是多行格式正确的 csv 数据。我需要将这些数据分成两个文件,或者至少以某种方式摆脱 header。
无论如何,我读到过您可以获得格式如下的 RDD:
[(filename, content)]
通过使用
spark \ .sparkContext \ .wholeTextFiles(input_files_csv)
然后我想在此 RDD 上执行映射操作,这会产生另一种与原始格式完全相同的格式
[(newfilename, content)]
然后我希望集群将这些内容保存在这些文件名下。
我找不到可以为我执行此操作的写入命令。我可以将 RDD 保存为原始文件,但我无法将其保存为普通文件,之后我可以将其作为数据帧读取。
我想我可以删除 headers,然后另存为一个巨大的 csv,并将文件名作为一个新列,但我觉得那样不会那么有效。
有人能解决我的问题吗?
这是 Scala,但在 Python 中应该不会太远。在 "foreach" 中,我没有使用任何特定于 spark 的东西来编写文件,只是使用常规的 Hadoop API。
sc.wholeTextFiles("/tmp/test-data/")
.foreach{ x =>
val filename = x._1
val content = x._2
val fs = FileSystem.get(new Configuration())
val output = fs.create(new Path(s"${filename}-copy"))
val writer = new PrintWriter(output)
writer.write(content)
writer.close
}