通过迭代另一个大 RDD 来过滤一个大 RDD - pySpark

Filter a large RDD by iterating over another large RDD - pySpark

我有一个很大的 RDD,称之为 RDD1,经过初始筛选后大约有 3 亿行。我想做的是从 RDD1 中获取 ID,并在另一个大数据集中找到它的所有其他实例,称之为 RDD2,大约有 30 亿行。 RDD2 是通过查询存储在 Hive 和 RDD1 中的镶木地板 table 创建的。来自 RDD1 的唯一 ID 的数量约为 1000 万个元素。

我的做法是目前收集id并广播,然后过滤RDD2。

我的问题是 - 有没有更有效的方法来做到这一点?或者这是最佳做法?

我有以下代码-

hiveContext = HiveContext(sc)
RDD1 = hiveContext("select * from table_1")
RDD2 = hiveContext.sql("select * from table_2")

ids = RDD1.map(lambda x: x[0]).distinct() # This is approximately 10 million ids
ids = sc.broadcast(set(ids.collect()))

RDD2_filter = RDD2.rdd.filter(lambda x: x[0] in ids.value))

我认为最好只使用一个 SQL 语句来进行连接:

RDD2_filter = hiveContext.sql("""select distinct t2.*
                                 from table_1 t1
                                 join table_2 t2 on t1.id = t2.id""")

我要做的是从 RDD1 中取出 3 亿个 id,构造一个布隆过滤器 (Bloom filter),将其用作广播变量来过滤 RDD2,你将得到包含所有 key-value parits for key 在 RDD1 中,加上一些误报。如果您希望结果在数百万以内,那么您将能够在 RDD1 和 RDD2Partial 上使用像 join、cogroup 等正常操作来毫无问题地获得准确的结果。

如果您希望结果的大小合理,则通过这种方式可以大大减少连接操作的时间,因为复杂性保持不变。即使结果在数亿数量级内,您也可能会获得一些合理的加速(例如 2-10 倍)。

编辑

可以有效地收集布隆过滤器,因为您可以将一个元素设置的位与另一个元素设置的位组合在一起 OR,这是关联和交换的。