限制 spark 上下文中的记录数量
Limit the amount of records in a spark context
我想减少每个reducer的记录数,并保留结果变量a rdd
使用 takeSample
似乎是显而易见的选择,但是,它 returns 是一个 collection
而不是 SparkContext
对象。
我想到了这个方法:
rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])
但是,这种方法很慢而且效率不高。
有没有更聪明的方法来获取小样本并保持数据结构rdd
?
如果你想要一个 small 示例子集并且不能对数据做任何额外的假设,那么 take
结合 parallelize
可能是一个最佳解决方案:
sc.parallelize(rdd.take(n))
它将触及相对较少的分区(在最好的情况下只有一个)并且小 n 的网络流量成本应该可以忽略不计。
采样(randomSplit
或 sample
)将需要与 zipWithIndex
和 filter
相同的完整数据扫描。
假设没有数据倾斜,您可以尝试这样的方法来解决这个问题:
from __future__ import division # Python 2 only
def limitApprox(rdd, n, timeout):
count = rdd.countApprox(timeout)
if count <= n:
return rdd
else:
rec_per_part = count // rdd.getNumPartitions()
required_parts = n / rec_per_part if rec_per_part else 1
return rdd.mapPartitionsWithIndex(
lambda i, iter: iter if i < required_parts else []
)
- 这仍然会访问每个分区,但如果不需要,会尽量避免计算内容
- 如果存在较大的数据倾斜,将无法工作
- 如果分布均匀但 n << 比每个分区的平均记录数要多得多。
- 如果分布偏向高指数,可能会欠采样。
如果数据可以表示为 Row
你可以尝试另一个技巧:
rdd.toDF().limit(n).rdd
我想减少每个reducer的记录数,并保留结果变量a rdd
使用 takeSample
似乎是显而易见的选择,但是,它 returns 是一个 collection
而不是 SparkContext
对象。
我想到了这个方法:
rdd = rdd.zipWithIndex().filter(lambda x:x[1]<limit).map(lambda x:x[0])
但是,这种方法很慢而且效率不高。
有没有更聪明的方法来获取小样本并保持数据结构rdd
?
如果你想要一个 small 示例子集并且不能对数据做任何额外的假设,那么 take
结合 parallelize
可能是一个最佳解决方案:
sc.parallelize(rdd.take(n))
它将触及相对较少的分区(在最好的情况下只有一个)并且小 n 的网络流量成本应该可以忽略不计。
采样(randomSplit
或 sample
)将需要与 zipWithIndex
和 filter
相同的完整数据扫描。
假设没有数据倾斜,您可以尝试这样的方法来解决这个问题:
from __future__ import division # Python 2 only
def limitApprox(rdd, n, timeout):
count = rdd.countApprox(timeout)
if count <= n:
return rdd
else:
rec_per_part = count // rdd.getNumPartitions()
required_parts = n / rec_per_part if rec_per_part else 1
return rdd.mapPartitionsWithIndex(
lambda i, iter: iter if i < required_parts else []
)
- 这仍然会访问每个分区,但如果不需要,会尽量避免计算内容
- 如果存在较大的数据倾斜,将无法工作
- 如果分布均匀但 n << 比每个分区的平均记录数要多得多。
- 如果分布偏向高指数,可能会欠采样。
如果数据可以表示为 Row
你可以尝试另一个技巧:
rdd.toDF().limit(n).rdd