Spark RDD:如何最有效地计算统计数据?

Spark RDD: How to calculate statistics most efficiently?

假设存在类似于以下元组的 RDD:

(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...

计算与每个键对应的统计信息的最有效(理想情况下,分布式)方法是什么? (目前,我特别希望计算标准偏差/方差。)据我所知,我的选择是:

  1. 使用colStats function in MLLib: 这种方法的优点是如果其他统计计算被认为是必要的,以后可以很容易地适应使用其他mllib.stat函数.但是,它在 Vector 的 RDD 上运行,其中包含每一列的数据,因此据我了解,这种方法需要在单个节点上收集每个键的完整值集,这似乎不是-非常适合大型数据集。 Spark Vector 是否总是意味着 Vector 中的数据驻留在本地,在单个节点上?
  2. 执行 groupByKey, then stats: Likely shuffle-heavy, as a result of the groupByKey operation.
  3. 执行 aggregateByKey, initializing a new StatCounter, and using StatCounter::merge as the sequence and combiner functions: This is the approach ,并避免选项 2 中的 groupByKey。但是,我无法在 PySpark 中找到 StatCounter 的良好文档。

我喜欢选项 1,因为它使代码更具可扩展性,因为它可以使用具有类似契约的其他 MLLib 函数轻松适应更复杂的计算,但如果 Vector 输入本质上要求数据集是本地收集,然后它限制了代码可以有效运行的数据大小。在其他两个之间,选项 3 看起来 更有效,因为它避免了 groupByKey,但我希望确认情况确实如此。

还有其他我没有考虑过的选择吗? (我目前正在使用 Python + PySpark,但如果存在语言差异,我也愿意接受 Java/Scala 中的解决方案。)

你可以试试reduceByKey。如果我们只想计算 min():

就非常简单了
rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]

要计算 mean,您首先需要创建 (value, 1) 元组,我们用它来计算 [=14] 中的 sumcount =] 操作。最后我们将它们彼此相除得到 mean:

meanRDD = (rdd
           .mapValues(lambda x: (x, 1))
           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
           .mapValues(lambda x: x[0]/x[1]))

meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]

对于variance,可以使用公式(sumOfSquares/count) - (sum/count)^2, 我们按以下方式翻译:

varRDD = (rdd
          .mapValues(lambda x: (1, x, x*x))
          .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
          .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))

varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]

我在虚拟 数据 中使用类型 double 而不是 int 的值来准确说明计算平均值和方差:

rdd = sc.parallelize([("key1", 1.0),
                      ("key3", 9.0),
                      ("key2", 3.0),
                      ("key1", 4.0),
                      ("key1", 5.0),
                      ("key3", 2.0),
                      ("key2", 7.0)])