Apache PySpark 版本之间的 spark 聚合函数是否发生变化?

Did spark aggregate function change between versions of Apache PySpark?

理论上,我想我理解 aggregate 的工作方式,但我无法通过一个非常简单的示例。

值得注意的是,示例 似乎有错误的结果。当我 运行 在我的机器上执行以下示例时,我得到......

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
ag = sc.parallelize([1, 2, 3, 4]).aggregate((1,0), seqOp, combOp)

那么,我得到的结果是

>>> ag
(12, 4)

但是,我引用的 link 说结果是 (19, 4)。这家伙正在使用不同版本的 spark (1.2.0)。我正在使用 1.5.2. 聚合函数是否在 Spark 版本之间发生了变化?

如果答案是 NO,那么 12 为何是该元组中的第一个元素仍然令人费解。只检查元组的第一个元素,我们可以看到
y 被添加到 RDD 中每个元素的元组的第一个元素。

因此,从 (1,0) 开始,并且由于 y 分别是 1, 2, 3, 4,,这应该会产生一系列元组,例如:(2,1), (3,1), (4,1), (5,1)。现在,当我在元组系列中添加第一个元素时,我得到 14?关于如何获得 12 有什么明显的缺失吗?非常感谢。

不,aggregate 函数的行为没有改变。

你的例子的问题 link 是零元素不是中性的。由于实际上每个分区创建一次零值,您实际上可以通过增加分区数并完全不传递任何数据来增加元组的第一个元素:

sc.parallelize([], 10).aggregate((1,0), seqOp, combOp)
## (11, 0)

sc.parallelize([], 100).aggregate((1,0), seqOp, combOp)
## (101, 0)

asc.parallelize([], 1000).aggregate((1,0), seqOp, combOp)
## (1001, 0)

这里要注意的是,给定您执行的操作,零值应该是中性的。

编辑:

中性元素是什么意思?在代数意义上,它应该是 identity element 相对于 seqOp / combOp。我在这里定义的操作案例是 (0, 0).

从开发人员的角度来看,您可以认为 zeroElement 添加到您的数据的次数不是合同的一部分。