RDD在火花中聚合

RDD Aggregate in spark

我是一名 Apache Spark 学习者,遇到了一个 RDD 操作 aggregate,我不知道它是如何工作的。有人可以一步一步地详细说明和解释我们是如何得到下面的代码结果的吗

RDD input = {1,2,3,3}

RDD Aggregate function :

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

output : {9,4}

谢谢

如果您不确定发生了什么,最好按照类型进行操作。为简洁起见,省略隐式 ClassTag 我们从这样的东西开始

abstract class RDD[T] extends Serializable with Logging 

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U 

如果忽略所有附加参数,您会看到 aggregate 是一个从 RDD[T] 映射到 U 的函数。这意味着输入 RDD 中值的类型不必与输出值的类型相同。所以它明显不同于 reduce:

def reduce(func: (T, T) ⇒ T): T 

fold:

def fold(zeroValue: T)(op: (T, T) => T): T

fold相同,aggregate需要一个zeroValue。如何选择呢?它应该是关于 combOp.

的身份(中性)元素

您还必须提供两个功能:

  • seqOp(U, T) 映射到 U
  • combOp(U, U) 映射到 U

仅基于此签名,您应该已经看到只有 seqOp 可以访问原始数据。它需要一些 U 类型的值,另一个 T 类型的值和 returns 类型 U 的值。在您的情况下,它是一个具有以下签名的函数

((Int, Int), Int) => (Int, Int) 

此时您可能怀疑它用于某种类似折叠的操作。

第二个函数采用 U 类型的两个参数和 returns 类型 U 的值。如前所述,应该清楚它不会触及原始数据,只能对 seqOp 已经处理过的值进行操作。在您的情况下,此函数具有如下签名:

((Int, Int), (Int, Int)) => (Int, Int) 

那么我们如何才能将所有这些整合在一起呢?

  1. 首先每个分区使用标准聚合Iterator.aggregate with zeroValue, seqOp and combOp passed as z, seqop and combop respectivelly. Since InterruptibleIterator used internally doesn't override aggregate it should be executed as a simple foldLeft(zeroValue)(seqOp)

  2. 从每个分区收集的下一个部分结果使用 combOp

  3. 聚合

假设输入 RDD 具有三个分区,值分布如下:

  • Iterator(1, 2)
  • Iterator(2, 3)
  • Iterator()

你可以预期忽略绝对顺序的执行将等同于这样的事情:

val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)

Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
  .map(_.foldLeft((0, 0))(seqOp))
  .reduce(combOp)

foldLeft 对于单个分区可以是这样的:

Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)

以及所有分区

Seq((3,2), (6,2), (0,0))

哪个组合会给你观察到的结果:

(3 + 6 + 0, 2 + 2 + 0)
(9, 4)

一般来说,这是一种常见的模式,您会在整个 Spark 中找到,您可以在其中传递中性值、一个用于处理每个分区的值的函数以及一个用于合并来自不同分区的部分聚合的函数。其他一些示例包括:

  • aggregateByKey
  • 用户定义的聚合函数
  • Aggregators Spark Datasets.

以下是我的理解,供大家参考:

假设您有两个节点,一个接受前两个列表元素 {1,2} 的输入,另一个接受 {3, 3}。 (这里的分区只是为了方便)

在第一个节点: "(x, y) => (x._1 + y, x._2 + 1)" ,第一个 x 是给定的 (0,0),y 是你的第一个元素 1,你将得到输出 (0+1, 0+1),然后是你的第二个元素 y=2,输出 (1 + 2, 1 + 1),即 (3, 2)

在第二个节点,同样的过程并行发生,你会得到 (6, 2)。

"(x, y) => (x._1 + y._1, x._2 + y._2)", 告诉你合并两个节点,并且你会得到 (9,4)


值得注意的一件事是 (0,0) 实际上被添加到结果中 长度(rdd)+1次。

"scala> rdd.aggregate((1,1)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9)"