Spark如何执行join + filter?它是可扩展的吗?
How does Spark execute a join + filter? Is it scalable?
假设我有两个包含键值对的大型 RDD,A 和 B。我想使用密钥加入 A 和 B,但是在匹配的 (a,b) 对中,我只想要 "good" 中的一小部分。所以我加入并在之后应用过滤器:
A.join(B).filter(isGoodPair)
其中 isGoodPair
是一个布尔函数,它告诉我一对 (a,b) 是否合适。
为了很好地扩展,Spark 的调度器最好避免在 A.join(B)
中显式地形成所有对。即使在大规模分布式基础上,这也可能导致耗时的磁盘溢出,甚至耗尽某些节点上的所有内存和磁盘资源。为避免这种情况,Spark 应在每个分区内生成对 (a,b) 时应用过滤器。
我的问题:
- Spark 真的这样做了吗?
- 其架构的哪些方面可以实现或阻止所需的行为?
- 我应该改用
cogroup
吗?在 PySpark 中,它 returns 是一个迭代器,所以我可以将过滤器应用于迭代器,对吗?
据我所知,Spark 不会 完全缓冲 join
和 filter
之间的数据。
两个join
and filter
output DStream都“[代表]连续的数据流”。这意味着 join
应该输出连续的数据流,filter
在可用时消耗这些数据。
然而,据我所知,join
将生成所有具有匹配键的 A、B 对,但 filter
将迅速丢弃不需要的结果,从而阻止整个结果集进入内存立刻。
我 运行 在 PySpark shell (运行ning Spark 1.2.1) 中做了一个实验来回答这些问题。结论如下:
- 不幸的是,Spark 不应用过滤器,因为连接生成对。它会在继续过滤之前显式生成整组连接对。
- 这可能是因为 Spark 运行s RDD t运行sformations 一次一个。它通常无法执行这种微妙的链接优化。
- 通过使用
cogroup
而不是 join
,我们可以手动实现所需的优化。
实验
我制作了一个包含 100 个组的 RDD,每个组包含 1 到 10,000 之间的整数,并且在每个组中我计算了最多相隔 1 的整数的数量:
import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart)
def joinAndFilter(A):
return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)
def cogroupAndFilter(A):
def fun(xs):
k,(xs1,xs2) = xs
return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
return A.cogroup(A).flatMap(fun)
cogroupAndFilter(A).count()
joinAndFilter(A).count()
我没有简单的方法来分析代码,所以我只是在 mac 的 Activity 监视器上 运行 观看了它:
当我使用 joinAndFilter
时,内存使用量激增,大概是因为它在应用逐一过滤器之前生成了所有对。实际上,我不得不关闭 PySpark,因为它耗尽了我所有的记忆并使系统崩溃。使用 cogroupAndFilter
,这些对在生成时被过滤,因此内存保持在控制之下。
假设我有两个包含键值对的大型 RDD,A 和 B。我想使用密钥加入 A 和 B,但是在匹配的 (a,b) 对中,我只想要 "good" 中的一小部分。所以我加入并在之后应用过滤器:
A.join(B).filter(isGoodPair)
其中 isGoodPair
是一个布尔函数,它告诉我一对 (a,b) 是否合适。
为了很好地扩展,Spark 的调度器最好避免在 A.join(B)
中显式地形成所有对。即使在大规模分布式基础上,这也可能导致耗时的磁盘溢出,甚至耗尽某些节点上的所有内存和磁盘资源。为避免这种情况,Spark 应在每个分区内生成对 (a,b) 时应用过滤器。
我的问题:
- Spark 真的这样做了吗?
- 其架构的哪些方面可以实现或阻止所需的行为?
- 我应该改用
cogroup
吗?在 PySpark 中,它 returns 是一个迭代器,所以我可以将过滤器应用于迭代器,对吗?
据我所知,Spark 不会 完全缓冲 join
和 filter
之间的数据。
两个join
and filter
output DStream都“[代表]连续的数据流”。这意味着 join
应该输出连续的数据流,filter
在可用时消耗这些数据。
然而,据我所知,join
将生成所有具有匹配键的 A、B 对,但 filter
将迅速丢弃不需要的结果,从而阻止整个结果集进入内存立刻。
我 运行 在 PySpark shell (运行ning Spark 1.2.1) 中做了一个实验来回答这些问题。结论如下:
- 不幸的是,Spark 不应用过滤器,因为连接生成对。它会在继续过滤之前显式生成整组连接对。
- 这可能是因为 Spark 运行s RDD t运行sformations 一次一个。它通常无法执行这种微妙的链接优化。
- 通过使用
cogroup
而不是join
,我们可以手动实现所需的优化。
实验
我制作了一个包含 100 个组的 RDD,每个组包含 1 到 10,000 之间的整数,并且在每个组中我计算了最多相隔 1 的整数的数量:
import itertools as it
g = int(1e2) # number of groups
n = int(1e4) # number of integers in each group
nPart = 32 # standard partitioning: 8 cores, 4 partitions per core
A = sc.parallelize(list(it.product(xrange(g),xrange(n))),nPart)
def joinAndFilter(A):
return A.join(A).filter(lambda (k,(x1,x2)): abs(x1 - x2) <= 1)
def cogroupAndFilter(A):
def fun(xs):
k,(xs1,xs2) = xs
return [(x1,x2) for (x1,x2) in it.product(xs1,xs2) if abs(x1 - x2) <= 1]
return A.cogroup(A).flatMap(fun)
cogroupAndFilter(A).count()
joinAndFilter(A).count()
我没有简单的方法来分析代码,所以我只是在 mac 的 Activity 监视器上 运行 观看了它:
当我使用 joinAndFilter
时,内存使用量激增,大概是因为它在应用逐一过滤器之前生成了所有对。实际上,我不得不关闭 PySpark,因为它耗尽了我所有的记忆并使系统崩溃。使用 cogroupAndFilter
,这些对在生成时被过滤,因此内存保持在控制之下。