Spark如何执行join + filter?它是可扩展的吗?

How does Spark execute a join + filter? Is it scalable?

假设我有两个包含键值对的大型 RDD,A 和 B。我想使用密钥加入 A 和 B,但是在匹配的 (a,b) 对中,我只想要 "good" 中的一小部分。所以我加入并在之后应用过滤器:

A.join(B).filter(isGoodPair)

其中 isGoodPair 是一个布尔函数,它告诉我一对 (a,b) 是否合适。

为了很好地扩展,Spark 的调度器最好避免在 A.join(B) 中显式地形成所有对。即使在大规模分布式基础上,这也可能导致耗时的磁盘溢出,甚至耗尽某些节点上的所有内存和磁盘资源。为避免这种情况,Spark 应在每个分区内生成对 (a,b) 时应用过滤器。

我的问题:

  1. Spark 真的这样做了吗?
  2. 其架构的哪些方面可以实现或阻止所需的行为?
  3. 我应该改用 cogroup 吗?在 PySpark 中,它 returns 是一个迭代器,所以我可以将过滤器应用于迭代器,对吗?

据我所知,Spark 不会 完全缓冲 joinfilter 之间的数据。

两个join and filter output DStream都“[代表]连续的数据流”。这意味着 join 应该输出连续的数据流,filter 在可用时消耗这些数据。

然而,据我所知,join 将生成所有具有匹配键的 A、B 对,但 filter 将迅速丢弃不需要的结果,从而阻止整个结果集进入内存立刻。

我 运行 在 PySpark shell (运行ning Spark 1.2.1) 中做了一个实验来回答这些问题。结论如下:

  1. 不幸的是,Spark 应用过滤器,因为连接生成对。它会在继续过滤之前显式生成整组连接对。
  2. 这可能是因为 Spark 运行s RDD t运行sformations 一次一个。它通常无法执行这种微妙的链接优化。
  3. 通过使用 cogroup 而不是 join,我们可以手动实现所需的优化。

实验

我制作了一个包含 100 个组的 RDD,每个组包含 1 到 10,000 之间的整数,并且在每个组中我计算了最多相隔 1 的整数的数量:

import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart) 

def joinAndFilter(A):
    return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)

def cogroupAndFilter(A):
    def fun(xs):
        k,(xs1,xs2) = xs
        return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
    return A.cogroup(A).flatMap(fun)

cogroupAndFilter(A).count()
joinAndFilter(A).count() 

我没有简单的方法来分析代码,所以我只是在 mac 的 Activity 监视器上 运行 观看了它:

当我使用 joinAndFilter 时,内存使用量激增,大概是因为它在应用逐一过滤器之前生成了所有对。实际上,我不得不关闭 PySpark,因为它耗尽了我所有的记忆并使系统崩溃。使用 cogroupAndFilter,这些对在生成时被过滤,因此内存保持在控制之下。