在 Spark 中处理 bzipped json 文件?
Processing bzipped json file in Spark?
我在 S3 中有大约 200 个文件,例如 a_file.json.bz2
,这些文件的每一行都是 JSON 格式的记录,但一些字段被 pickle.dumps
序列化,例如datetime
字段。 bzip
压缩后每个文件约 1GB。现在我需要在 Spark(实际上是 pyspark)中处理这些文件,但我什至无法取出每条记录。那么这里的最佳做法是什么?
ds.take(10)
给出
[(0, u'(I551'),
(6, u'(dp0'),
(11, u'Vadv_id'),
(19, u'p1'),
(22, u'V479883'),
(30, u'p2'),
(33, u'sVcpg_id'),
(42, u'p3'),
(45, u'V1913398'),
(54, u'p4')]
显然不是按每条记录拆分。
谢谢。
您可以通过 SparkContext.wholeTextFiles
逐个文件(而不是逐行)访问输入。然后,您可以使用 flatMap
解压缩并解析您自己代码中的行。
我遇到了这个问题 。您可以按照 Daniel 的建议使用 wholeTextFiles
,但是在读取大文件时必须小心,因为整个文件将在处理之前加载到内存中。如果文件太大,可能会使执行程序崩溃。我使用了 parallelize
和 flatMap
。也许类似于
def read_fun_generator(filename):
with bz2.open(filename, 'rb') as f:
for line in f:
yield line.strip()
bz2_filelist = glob.glob("/path/to/files/*.bz2")
rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)
其实是pickle
造成的问题。通过查看压缩后的文件内容,确实是
(I551
(dp0
Vadv_id
p1
V479883
p2
sVcpg_id
p3
V1913398
p4
这让我难以解析。我知道我可以 pick.load(file)
多次取出对象,但在 Spark 中找不到快速解决方案,我只能逐行访问加载的文件。此外,此文件中的记录具有可变字段和长度,这使得破解变得更加困难。
我最终从源代码重新生成了这些 bz2
文件,因为它实际上更简单、更快捷。我了解到 Spark 和 hadoop 完美支持 bz2
压缩,因此不需要额外的操作。
我在 S3 中有大约 200 个文件,例如 a_file.json.bz2
,这些文件的每一行都是 JSON 格式的记录,但一些字段被 pickle.dumps
序列化,例如datetime
字段。 bzip
压缩后每个文件约 1GB。现在我需要在 Spark(实际上是 pyspark)中处理这些文件,但我什至无法取出每条记录。那么这里的最佳做法是什么?
ds.take(10)
给出
[(0, u'(I551'),
(6, u'(dp0'),
(11, u'Vadv_id'),
(19, u'p1'),
(22, u'V479883'),
(30, u'p2'),
(33, u'sVcpg_id'),
(42, u'p3'),
(45, u'V1913398'),
(54, u'p4')]
显然不是按每条记录拆分。
谢谢。
您可以通过 SparkContext.wholeTextFiles
逐个文件(而不是逐行)访问输入。然后,您可以使用 flatMap
解压缩并解析您自己代码中的行。
我遇到了这个问题 wholeTextFiles
,但是在读取大文件时必须小心,因为整个文件将在处理之前加载到内存中。如果文件太大,可能会使执行程序崩溃。我使用了 parallelize
和 flatMap
。也许类似于
def read_fun_generator(filename):
with bz2.open(filename, 'rb') as f:
for line in f:
yield line.strip()
bz2_filelist = glob.glob("/path/to/files/*.bz2")
rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)
其实是pickle
造成的问题。通过查看压缩后的文件内容,确实是
(I551
(dp0
Vadv_id
p1
V479883
p2
sVcpg_id
p3
V1913398
p4
这让我难以解析。我知道我可以 pick.load(file)
多次取出对象,但在 Spark 中找不到快速解决方案,我只能逐行访问加载的文件。此外,此文件中的记录具有可变字段和长度,这使得破解变得更加困难。
我最终从源代码重新生成了这些 bz2
文件,因为它实际上更简单、更快捷。我了解到 Spark 和 hadoop 完美支持 bz2
压缩,因此不需要额外的操作。