分布式进程更新 Spark 中的 global/single 变量
Distributed process updating a global/single variable in Spark
我在尝试处理集群上的大量数据时遇到了麻烦。
代码:
val (sumZ, batchSize) = data.rdd.repartition(4)
.treeAggregate(0L, 0L))(
seqOp = (c, v) => {
// c: (z, count), v
val step = this.update(c, v)
(step._1, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (z, count)
(c1._1 + c2._1, c1._2 + c2._2)
})
val finalZ = sumZ / 4
正如您在代码中看到的,我目前的做法是将这些数据分成 4 个块(x0、x1, x2, x3) 使所有进程独立。每个进程产生一个输出(z0, z1, z2, z3),z的最终值是这4个结果的平均值。
这种方法可行,但精度(和计算时间)受分区数量的影响。
我的问题 是否有生成 "global" z 的方法,它将从每个进程(分区)更新。
TL;DR 没有。 Spark 没有同步访问的共享内存,因此不存在真正的全局访问。
Spark 中 "shared" 可写变量的唯一形式是 Accumulator
。它允许具有交换和关联功能的只写访问。
因为它的实现等同于 reduce
/ aggregate
:
- 每个分区都有自己的本地更新副本。
- 任务完成后,部分结果将发送到驱动程序并与 "global" 实例结合。
它不能解决您的问题。
我在尝试处理集群上的大量数据时遇到了麻烦。
代码:
val (sumZ, batchSize) = data.rdd.repartition(4)
.treeAggregate(0L, 0L))(
seqOp = (c, v) => {
// c: (z, count), v
val step = this.update(c, v)
(step._1, c._2 + 1)
},
combOp = (c1, c2) => {
// c: (z, count)
(c1._1 + c2._1, c1._2 + c2._2)
})
val finalZ = sumZ / 4
正如您在代码中看到的,我目前的做法是将这些数据分成 4 个块(x0、x1, x2, x3) 使所有进程独立。每个进程产生一个输出(z0, z1, z2, z3),z的最终值是这4个结果的平均值。
这种方法可行,但精度(和计算时间)受分区数量的影响。
我的问题 是否有生成 "global" z 的方法,它将从每个进程(分区)更新。
TL;DR 没有。 Spark 没有同步访问的共享内存,因此不存在真正的全局访问。
Spark 中 "shared" 可写变量的唯一形式是 Accumulator
。它允许具有交换和关联功能的只写访问。
因为它的实现等同于 reduce
/ aggregate
:
- 每个分区都有自己的本地更新副本。
- 任务完成后,部分结果将发送到驱动程序并与 "global" 实例结合。
它不能解决您的问题。