在执行器上动态创建累加器
Create accumulator on executor dynamically
我想使用累加器来计算我的 RDD 中对象的几个参数的组合。
例如,我有 Obj
的 RDD,字段为 a
和 b
。这两个字段都是枚举,可能具有少数几个值之一。
为了实现它,我应该在驱动程序上创建累加器并在工作人员上使用它:
val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...
我不想为我将具有相同逻辑的所有 spark 作业中的所有值组合声明大量计数器。
是否有任何机制允许在执行器上动态创建累加器或以其他方式解决此问题?
我搜索类似的东西:
rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }
从字面上回答你的问题,你不能在执行器上动态注册新的累加器。在作业实际开始之前,必须在驱动程序 (sparkContext.accumulator()
) 上规划累加器。这就是 Spark 中累加器的设计方式。
但考虑到您实际想要实现的内容,您可能会得出结论,只需一个“静态”累加器即可实现相同的功能,该累加器收集 Map<String, Long>
个条目而不是 Long
.
This 博客 post 可以更实际地理解我在这里的意思。
Egordoe 的回答中的 Link 对于旧的 spark 版本是正确的。
我对现代 spark (kotlin) 的实现:
class MapAccumulator(val counters: MutableMap<String, Long>) : AccumulatorV2<String, Map<String, Long>>() {
constructor() : this(HashMap<String, Long>())
override fun isZero() = counters.isEmpty()
override fun copy(): AccumulatorV2<String, Map<String, Long>> {
val copy = HashMap<String, Long>()
copy.putAll(counters)
return MapAccumulator(copy)
}
override fun reset() {
counters.clear()
}
override fun add(v: String) {
counters.merge(v, 1, Long::plus)
}
override fun merge(other: AccumulatorV2<String, Map<String, Long>>) {
other.value().forEach { (k, v) -> counters.merge(k, v, Long::plus) }
}
override fun value(): Map<String, Long> = counters
}
我想使用累加器来计算我的 RDD 中对象的几个参数的组合。
例如,我有 Obj
的 RDD,字段为 a
和 b
。这两个字段都是枚举,可能具有少数几个值之一。
为了实现它,我应该在驱动程序上创建累加器并在工作人员上使用它:
val acc1 = sc.longAccumulator("a1-b1")
val acc2 = sc.longAccumulator("a2-b1")
val acc3 = sc.longAccumulator("a1-b2")
...
我不想为我将具有相同逻辑的所有 spark 作业中的所有值组合声明大量计数器。 是否有任何机制允许在执行器上动态创建累加器或以其他方式解决此问题?
我搜索类似的东西:
rdd.foreach{ getAccumulator("${obj.a} - ${obj.b}").add(1) }
从字面上回答你的问题,你不能在执行器上动态注册新的累加器。在作业实际开始之前,必须在驱动程序 (sparkContext.accumulator()
) 上规划累加器。这就是 Spark 中累加器的设计方式。
但考虑到您实际想要实现的内容,您可能会得出结论,只需一个“静态”累加器即可实现相同的功能,该累加器收集 Map<String, Long>
个条目而不是 Long
.
This 博客 post 可以更实际地理解我在这里的意思。
Link 对于旧的 spark 版本是正确的。 我对现代 spark (kotlin) 的实现:
class MapAccumulator(val counters: MutableMap<String, Long>) : AccumulatorV2<String, Map<String, Long>>() {
constructor() : this(HashMap<String, Long>())
override fun isZero() = counters.isEmpty()
override fun copy(): AccumulatorV2<String, Map<String, Long>> {
val copy = HashMap<String, Long>()
copy.putAll(counters)
return MapAccumulator(copy)
}
override fun reset() {
counters.clear()
}
override fun add(v: String) {
counters.merge(v, 1, Long::plus)
}
override fun merge(other: AccumulatorV2<String, Map<String, Long>>) {
other.value().forEach { (k, v) -> counters.merge(k, v, Long::plus) }
}
override fun value(): Map<String, Long> = counters
}