在 Spark 中处理 bzipped json 文件?

Processing bzipped json file in Spark?

我在 S3 中有大约 200 个文件,例如 a_file.json.bz2,这些文件的每一行都是 JSON 格式的记录,但一些字段被 pickle.dumps 序列化,例如datetime 字段。 bzip 压缩后每个文件约 1GB。现在我需要在 Spark(实际上是 pyspark)中处理这些文件,但我什至无法取出每条记录。那么这里的最佳做法是什么?

ds.take(10)给出

[(0, u'(I551'),
 (6, u'(dp0'),
 (11, u'Vadv_id'),
 (19, u'p1'),
 (22, u'V479883'),
 (30, u'p2'),
 (33, u'sVcpg_id'),
 (42, u'p3'),
 (45, u'V1913398'),
 (54, u'p4')]

显然不是按每条记录拆分。

谢谢。

您可以通过 SparkContext.wholeTextFiles 逐个文件(而不是逐行)访问输入。然后,您可以使用 flatMap 解压缩并解析您自己代码中的行。

我遇到了这个问题 。您可以按照 Daniel 的建议使用 wholeTextFiles,但是在读取大文件时必须小心,因为整个文件将在处理之前加载到内存中。如果文件太大,可能会使执行程序崩溃。我使用了 parallelizeflatMap。也许类似于

def read_fun_generator(filename):
    with bz2.open(filename, 'rb') as f:
        for line in f:
            yield line.strip()

bz2_filelist = glob.glob("/path/to/files/*.bz2")
rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)

其实是pickle造成的问题。通过查看压缩后的文件内容,确实是

(I551
(dp0
Vadv_id
p1
V479883
p2
sVcpg_id
p3
V1913398
p4

这让我难以解析。我知道我可以 pick.load(file) 多次取出对象,但在 Spark 中找不到快速解决方案,我只能逐行访问加载的文件。此外,此文件中的记录具有可变字段和长度,这使得破解变得更加困难。

我最终从源代码重新生成了这些 bz2 文件,因为它实际上更简单、更快捷。我了解到 Spark 和 hadoop 完美支持 bz2 压缩,因此不需要额外的操作。