如何使用 PySpark HashPartitioner 检测大型 json 文件中的重复项

How to detect duplicates in large json file using PySpark HashPartitioner

我有一个大型 json 文件,其中包含超过 20GB 的 json 结构化元数据。它包含跨某些应用程序的简单用户元数据,我想筛选它以检测重复项。以下是数据的示例:

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

json 文件逐行包含看起来与此非常相似的 json 个对象。当两个 json 对象的 "name" 字段相同时,会出现重复。所以,这是重复的:

{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

只要两个 json 个完全相同的对象。

现在,我想检查整个 json 文件,该文件太大而无法放入内存,并通过使用最佳标准找出所有重复项 以及它们的内容是 的重复项,然后做一些逻辑 - 逻辑部分很简单,但我有点不确定如何找到重复项。

我想到了什么:

  1. 我首先考虑使用布隆过滤器。它们并没有那么令人困惑,并且工作得很好而且速度很快,而且我认为它们本质上可以归结为 O(n)。然而,布隆过滤器不会让我知道重复的字符串是什么的重复,这对我来说是不可能的。

  2. 我考虑过使用外部归并排序。我基本上会将文件分成多个适合内存的较小文件,对每个块进行排序并搜索重复项(现在聚集在一起)。但我不太确定这个实现是我想要的。

  3. 我 运行 接下来要做的是按分区散列,我怀疑这就是我想要的。在处理适合内存的数据时,散列本质上是查找重复项的最佳方式,那么为什么不将它用于不适合内存的数据呢?我对如何按分区散列有点困惑。我不确定这是否是我要找的。

所以,我想我应该使用选项 3,按分区散列,我知道 Spark 有这个。我希望是否有人可以让我知道我是否在正确的轨道上,并且可能会给我一些指示,说明我是否正确。从概念上讲,我有几个具体问题:

  1. 假设我创建了 100 个完全适合内存的分区(因此,在我的例子中,每个分区为 100MB)。假设我将 json 文件中的第一个 x 元素散列到一个分区中,我发现 没有 重复项。假设我有另一个分区,其中包含第二个 100MB 的数据,其中也不包含重复项。如果我一次只能加载 100MB 的数据,我如何检查分区 1 和分区 2 之间没有任何重复?澄清一下,如果分区 1 有一个元素而分区 2 有一个相同的元素,我该如何计算出来?我想我需要将两者都加载到内存中,对吧?如果我做不到……那我该怎么办?可能是我理解错了...

  2. 这就是我的第二个问题 - 这似乎不是分区的工作方式,当您按分区散列时,具有相似散列或散列的元素 运行ge 进入一个特定的文件。因此,如果两个元素重复,我会知道,因为算法会尝试将其放入散列已存在的文件中。是这样吗?

我知道我还有更多问题,只是想不出来了。有没有人有任何提示?特别是关于 pyspark 以及如何最好地使用它?或者 pyspark 不是我要找的东西?

问题比您想象的要简单。您实际上只需要按照@Hitobat 的建议按 name 聚合数据。我会用 pyspark.sql.Window 来解决这个问题,以简化聚合输出。

给定以下数据是一个名称为 data.json 的文件(这也可以是一个文件目录,而不是单个文件)

data.json

的内容
{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}

那么 pyspark 代码将如下所示:

from pyspark.sql import Window
from pyspark.sql import functions as F

df = spark.read.json("data.json") # can be a directory of files as well 
df.show()

输出

+----------+----------+---------------+--------+----------------+
|   created|created_at|           name|    type|        username|
+----------+----------+---------------+--------+----------------+
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|
|2016-01-19|2012-05-25|  arthurking231|    null|            null|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|
+----------+----------+---------------+--------+----------------+ 

然后用pyspark.sql.Window

分区计数
name_partition_window = Window.partitionBy("name")
df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
df_with_repeat_counts.show()

输出

+----------+----------+---------------+--------+----------------+-----------+
|   created|created_at|           name|    type|        username|name_counts|
+----------+----------+---------------+--------+----------------+-----------+
|2016-01-19|2012-05-25|  arthurking231|    null|            null|          1|
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|          1|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|          2|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|          1|
+----------+----------+---------------+--------+----------------+-----------+

然后过滤name_count列的dataframe并按名称排序以供检查

duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
duplicates.show()

输出

+----------+----------+---------------+--------+--------+-----------+
|   created|created_at|           name|    type|username|name_counts|
+----------+----------+---------------+--------+--------+-----------+
|2015-11-08|2010-01-19|Assasinator5827|    null|    null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|    null|          2|
+----------+----------+---------------+--------+--------+-----------+

此时您可以根据用例的需要分析 duplicates 数据框。