PySpark,按键交集

PySpark, intersection by Key

例如我在 PySpark 中有两个 RDD:

((0,0), 1)
((0,1), 2)
((1,0), 3)
((1,1), 4)

第二个就是

((0,1), 3)
((1,1), 0)

我想要第一个 RDD 与第二个 RDD 的交集。实际上,第二个 RDD 必须扮演第一个 RDD 掩码的角色。输出应该是:

((0,1), 2)
((1,1), 4)

它表示第一个 RDD 的值,但仅适用于第二个 RDD 的键。两个RDD的长度不同。

我有一些解决方案(必须证明),但是是这样的:

rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))

我不知道,这个解决方案的效率如何。想听听经验丰富的Spark程序员的意见....

也许我们不应该把这个过程看作是join。您不是真的要连接两个数据集,而是要从另一个数据集中减去一个数据集?

我将根据您的问题陈述我的假设

  1. 您根本不关心第二个数据集中的值。
  2. 您只想保留第一个数据集中的值,其中键值对出现在第二个数据集中。

想法 1:Cogroup(我认为可能是最快的方法)。它基本上是在计算两个数据集的交集。

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))

思路 2:按键减法

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])

unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)

我不能 100% 确定这是否比您的方法更快。它确实需要两次 subtractByKey 操作,这可能很慢。此外,此方法不保留顺序(例如 ((0, 1), 2),尽管在您的第一个数据集中排在第一位,但在最终数据集中排在第二位)。但我无法想象这很重要。

至于哪个更快,我想这取决于你的 cartersian join 需要多长时间。映射和过滤往往比 subtractByKey 所需的洗牌操作更快,但当然 cartesian 是一个耗时的过程。

总之,我想你可以试试这个方法,看看它是否适合你!


性能改进的旁注,具体取决于 RDD 的大小。

如果 rdd1 足够小,可以保存在主内存中,如果广播它然后对它进行流式传输 rdd2,减法过程可以大大加快。但是,我承认这种情况很少见。