Spark:reduce 和 reduceByKey 之间的语义差异

Spark: difference of semantics between reduce and reduceByKey

在 Spark 的文档中,它说 RDDs 方法 reduce 需要关联和交换二元函数。

但是,方法 reduceByKey 只需要关联二元函数。

sc.textFile("file4kB", 4)

我做了一些测试,显然这是我得到的行为。为什么会有这种差异?为什么 reduceByKey 确保二元函数始终按特定顺序应用(以适应交换性的缺乏)而 reduce 却没有?

例如,如果加载一些具有 4 个分区(最少)的(小)文本:

val r = sc.textFile("file4k", 4)

然后:

r.reduce(_ + _)

returns 一个字符串,其中部分的顺序并不总是相同,而:

r.map(x => (1,x)).reduceByKey(_ + _).first

始终returns相同的字符串(所有内容的顺序与原始文件中的顺序相同)。

(我用r.glom查了一下,文件内容确实分布在4个分区,没有空分区)。

就我而言,这是文档中的一个错误,您看到的结果只是偶然的。实践,other resources and a simple analysis of the code 表明传递给 reduceByKey 的函数不仅应该是关联的,而且应该是交换的。

  • 实践 - 虽然看起来订单在本地模式下保留,但当您 运行 Spark 在集群上(包括独立模式)时不再如此。

  • 其他资源-引用Data Exploration Using Spark from AmpCamp 3:

    There is a convenient method called reduceByKey in Spark for exactly this pattern. Note that the second argument to reduceByKey determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side.

  • 代码 - reduceByKey 使用 combineByKeyWithClassTag 实现并创建 ShuffledRDD。由于 Spark 不保证洗牌后的顺序,因此恢复它的唯一方法是将一些元数据附加到部分减少的记录。据我所知,没有发生过这样的事情。

旁注 reduce 因为它是在 PySpark 中实现的,所以它可以很好地处理仅可交换的函数。这当然只是一个实现的细节,而不是合同的一部分。

根据代码文档,最近 updated/corrected。 (感谢@zero323):

reduceByKey merges the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.

所以这实际上是一个文档错误,就像@zero323 在他的回答中指出的那样。

您可以检查以下代码链接以确保: