将 2TB 的 gzipped 多行 JSON 转换为 NDJSON
Converting 2TB of gziped multiline JSONs to NDJSONs
对于我的研究,我有一个包含大约 20,000 个 gzip 多行 json 文件的数据集(~2TB,所有文件都具有相同的架构)。我需要处理和清理这些数据(我应该说我对数据分析工具很陌生)。
在花了几天时间阅读有关 Spark 和 Apache Beam 的内容后,我确信第一步是首先将此数据集转换为 NDJSON。在大多数书籍和教程中,他们总是假设您正在使用一些换行分隔的文件。
转换这些数据的最佳方式是什么?
我试图在 gcloud 上启动一个大型实例,然后使用 gunzip 和 jq 来执行此操作。毫不奇怪,这似乎需要很长时间。
在此先感谢您的帮助!
如果您使用 sc.wholeTextFiles
,则无需转换为 NDJSON。将此方法指向一个目录,您将返回一个 RDD[(String, String)]
,其中 ._1
是文件名,._2
是文件的内容。
如果您使用 TextIO,Apache Beam 支持解压缩文件。
但分隔符仍然是新行。
对于多行 json 您可以使用并行读取完整文件,然后将 json 字符串转换为 pojo 并最终重新排列数据以利用并行性。
所以步骤是
Get the file list > Read individual files > Parse file content to json objects > Reshuffle > ...
您可以通过FileSystems.match("gcs://my_bucker").metadata()获取文件列表。
通过压缩读取单个文件Compression.detect((fileResouceId).getFilename()).readDecompressed(FileSystems.open(fileResouceId))
对于我的研究,我有一个包含大约 20,000 个 gzip 多行 json 文件的数据集(~2TB,所有文件都具有相同的架构)。我需要处理和清理这些数据(我应该说我对数据分析工具很陌生)。
在花了几天时间阅读有关 Spark 和 Apache Beam 的内容后,我确信第一步是首先将此数据集转换为 NDJSON。在大多数书籍和教程中,他们总是假设您正在使用一些换行分隔的文件。
转换这些数据的最佳方式是什么? 我试图在 gcloud 上启动一个大型实例,然后使用 gunzip 和 jq 来执行此操作。毫不奇怪,这似乎需要很长时间。
在此先感谢您的帮助!
如果您使用 sc.wholeTextFiles
,则无需转换为 NDJSON。将此方法指向一个目录,您将返回一个 RDD[(String, String)]
,其中 ._1
是文件名,._2
是文件的内容。
如果您使用 TextIO,Apache Beam 支持解压缩文件。 但分隔符仍然是新行。
对于多行 json 您可以使用并行读取完整文件,然后将 json 字符串转换为 pojo 并最终重新排列数据以利用并行性。
所以步骤是
Get the file list > Read individual files > Parse file content to json objects > Reshuffle > ...
您可以通过FileSystems.match("gcs://my_bucker").metadata()获取文件列表。
通过压缩读取单个文件Compression.detect((fileResouceId).getFilename()).readDecompressed(FileSystems.open(fileResouceId))