蓄电池什么时候才真正可靠?
When are accumulators truly reliable?
我想使用累加器收集一些关于我在 Spark 作业上处理的数据的统计信息。理想情况下,我会在作业计算所需的转换时这样做,但由于 Spark 会在不同情况下重新计算任务,因此累加器不会反映真实的指标。以下是文档对此的描述:
For accumulator updates performed inside actions only, Spark
guarantees that each task’s update to the accumulator will only be
applied once, i.e. restarted tasks will not update the value. In
transformations, users should be aware of that each task’s update may
be applied more than once if tasks or job stages are re-executed.
这令人困惑,因为大多数 操作 不允许 运行 自定义代码(可以使用累加器的地方),它们大多数情况下(懒惰地)获取先前转换的结果。文档还显示了这一点:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
但是如果我们在最后加上data.count()
,这是否保证正确(没有重复)?显然 acc
没有使用 "inside actions only",因为 map 是一个转换。所以应该不能保证。
另一方面,有关 Jira 票证的讨论谈论 "result tasks" 而不是 "actions"。例如here and here。这似乎表明结果确实可以保证是正确的,因为我们在 action 之前立即使用 acc
,因此应该作为一个阶段进行计算。
我猜 "result task" 这个概念与所涉及的操作类型有关,是最后一个包含操作的操作,就像在这个例子中,它显示了几个操作是如何划分的进入阶段(洋红色,图片来自 here):
所以假设,链末端的 count()
动作将是同一最终阶段的一部分,我可以保证最后一张地图上使用的累加器不会包含任何重复项?
如果能澄清这个问题就好了!谢谢。
我认为 Matei 在参考文档中回答了这个问题:
As discussed on https://github.com/apache/spark/pull/2524 this is
pretty hard to provide good semantics for in the general case
(accumulator updates inside non-result stages), for the following
reasons:
An RDD may be computed as part of multiple stages. For
example, if you update an accumulator inside a MappedRDD and then
shuffle it, that might be one stage. But if you then call map() again
on the MappedRDD, and shuffle the result of that, you get a second
stage where that map is pipeline. Do you want to count this
accumulator update twice or not?
Entire stages may be resubmitted if
shuffle files are deleted by the periodic cleaner or are lost due to a
node failure, so anything that tracks RDDs would need to do so for
long periods of time (as long as the RDD is referenceable in the user
program), which would be pretty complicated to implement.
So I'm going
to mark this as "won't fix" for now, except for the part for result
stages done in SPARK-3628.
当任务成功完成时,累加器更新将发送回驱动程序。因此,当您确定每个任务都只执行一次并且每个任务都按您的预期执行时,您的累加器结果就可以保证是正确的。
我更喜欢依赖 reduce
和 aggregate
而不是累加器,因为很难枚举任务执行的所有方式。
- 一个动作启动任务。
- 如果操作依赖于较早的阶段并且该阶段的结果未(完全)缓存,则较早阶段的任务将开始。
- 当检测到少量慢速任务时,推测执行会启动重复任务。
也就是说,有很多简单的情况可以完全信任累加器。
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Would this be guaranteed to be correct (have no duplicates)?
是,如果推测执行被禁用。 map
和 count
将是一个阶段,所以就像你说的,任务不可能成功执行超过一次。
但是作为副作用,累加器被更新了。因此,在考虑代码将如何执行时,您必须非常小心。考虑这个而不是 accumulating.count
:
// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
这也会为每个分区创建一个任务,并且每个任务都将保证恰好执行一次。但是 map
中的代码不会在所有元素上执行,只会在每个分区中的第一个元素上执行。
累加器就像一个全局变量。如果您共享对可以递增累加器的 RDD 的引用,那么其他代码(其他线程)也可以使它递增。
// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
// It may trigger the calculation
// any number of times.
accumulating.count
assert(acc >= 10)
回答问题"When are accumulators truly reliable ?"
答案:当它们出现在 Action 操作中时。
根据 Action Task 中的文档,即使存在任何重新启动的任务,它也只会更新 Accumulator 一次。
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
并且 Action 允许 运行 自定义代码。
例如
val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
if(x!=""){
accNotEmpty += 1
}
})
但是,为什么 Map+Action 即。结果任务 操作对于累加器操作不可靠?
- 由于代码中的某些异常,任务失败。 Spark 将尝试 4 次(默认尝试次数)。如果任务每次都失败,它会偶然给出 exception.If 成功,那么 Spark 将继续并只更新成功状态的累加器值,失败状态的累加器值将被忽略.
判决:处理得当
- 阶段失败:如果执行节点崩溃,不是用户的错,而是硬件故障 - 如果节点在洗牌中出现故障 stage.As 洗牌输出存储在本地,如果节点出现故障,则洗牌输出是 gone.So Spark 回到生成 shuffle 输出的阶段,查看哪些任务需要重新 运行,并在仍然是 alive.After 的节点之一上执行它们重新生成丢失的洗牌输出,生成映射输出的阶段已经执行了它的一些任务 times.Spark 计算所有这些任务的累加器更新。
结论:未在结果中处理 Task.Accumulator 将给出错误的输出。
- 如果任务 运行ning 很慢,Spark 可以在另一个节点上启动该任务的推测副本。
结论:不是 handled.Accumulator 会给出错误的输出。
- 缓存的RDD很大,不能驻留在Memory.So中,每当使用RDD时,它将重新运行 Map操作来获取RDD,并再次更新累加器.
结论:不是 handled.Accumulator 会给出错误的输出。
所以它可能会发生相同的功能可能 运行 多次在同一个 data.So 由于 Map 操作,Spark 不为累加器更新提供任何保证。
所以在Spark的Action操作中最好使用Accumulator
要了解有关 Accumulator 及其问题的更多信息,请参阅此 Blog Post - Imran Rashid。
我想使用累加器收集一些关于我在 Spark 作业上处理的数据的统计信息。理想情况下,我会在作业计算所需的转换时这样做,但由于 Spark 会在不同情况下重新计算任务,因此累加器不会反映真实的指标。以下是文档对此的描述:
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
这令人困惑,因为大多数 操作 不允许 运行 自定义代码(可以使用累加器的地方),它们大多数情况下(懒惰地)获取先前转换的结果。文档还显示了这一点:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
但是如果我们在最后加上data.count()
,这是否保证正确(没有重复)?显然 acc
没有使用 "inside actions only",因为 map 是一个转换。所以应该不能保证。
另一方面,有关 Jira 票证的讨论谈论 "result tasks" 而不是 "actions"。例如here and here。这似乎表明结果确实可以保证是正确的,因为我们在 action 之前立即使用 acc
,因此应该作为一个阶段进行计算。
我猜 "result task" 这个概念与所涉及的操作类型有关,是最后一个包含操作的操作,就像在这个例子中,它显示了几个操作是如何划分的进入阶段(洋红色,图片来自 here):
所以假设,链末端的 count()
动作将是同一最终阶段的一部分,我可以保证最后一张地图上使用的累加器不会包含任何重复项?
如果能澄清这个问题就好了!谢谢。
我认为 Matei 在参考文档中回答了这个问题:
As discussed on https://github.com/apache/spark/pull/2524 this is pretty hard to provide good semantics for in the general case (accumulator updates inside non-result stages), for the following reasons:
An RDD may be computed as part of multiple stages. For example, if you update an accumulator inside a MappedRDD and then shuffle it, that might be one stage. But if you then call map() again on the MappedRDD, and shuffle the result of that, you get a second stage where that map is pipeline. Do you want to count this accumulator update twice or not?
Entire stages may be resubmitted if shuffle files are deleted by the periodic cleaner or are lost due to a node failure, so anything that tracks RDDs would need to do so for long periods of time (as long as the RDD is referenceable in the user program), which would be pretty complicated to implement.
So I'm going to mark this as "won't fix" for now, except for the part for result stages done in SPARK-3628.
当任务成功完成时,累加器更新将发送回驱动程序。因此,当您确定每个任务都只执行一次并且每个任务都按您的预期执行时,您的累加器结果就可以保证是正确的。
我更喜欢依赖 reduce
和 aggregate
而不是累加器,因为很难枚举任务执行的所有方式。
- 一个动作启动任务。
- 如果操作依赖于较早的阶段并且该阶段的结果未(完全)缓存,则较早阶段的任务将开始。
- 当检测到少量慢速任务时,推测执行会启动重复任务。
也就是说,有很多简单的情况可以完全信任累加器。
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Would this be guaranteed to be correct (have no duplicates)?
是,如果推测执行被禁用。 map
和 count
将是一个阶段,所以就像你说的,任务不可能成功执行超过一次。
但是作为副作用,累加器被更新了。因此,在考虑代码将如何执行时,您必须非常小心。考虑这个而不是 accumulating.count
:
// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
这也会为每个分区创建一个任务,并且每个任务都将保证恰好执行一次。但是 map
中的代码不会在所有元素上执行,只会在每个分区中的第一个元素上执行。
累加器就像一个全局变量。如果您共享对可以递增累加器的 RDD 的引用,那么其他代码(其他线程)也可以使它递增。
// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
// It may trigger the calculation
// any number of times.
accumulating.count
assert(acc >= 10)
回答问题"When are accumulators truly reliable ?"
答案:当它们出现在 Action 操作中时。
根据 Action Task 中的文档,即使存在任何重新启动的任务,它也只会更新 Accumulator 一次。
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.
并且 Action 允许 运行 自定义代码。
例如
val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
if(x!=""){
accNotEmpty += 1
}
})
但是,为什么 Map+Action 即。结果任务 操作对于累加器操作不可靠?
- 由于代码中的某些异常,任务失败。 Spark 将尝试 4 次(默认尝试次数)。如果任务每次都失败,它会偶然给出 exception.If 成功,那么 Spark 将继续并只更新成功状态的累加器值,失败状态的累加器值将被忽略.
判决:处理得当 - 阶段失败:如果执行节点崩溃,不是用户的错,而是硬件故障 - 如果节点在洗牌中出现故障 stage.As 洗牌输出存储在本地,如果节点出现故障,则洗牌输出是 gone.So Spark 回到生成 shuffle 输出的阶段,查看哪些任务需要重新 运行,并在仍然是 alive.After 的节点之一上执行它们重新生成丢失的洗牌输出,生成映射输出的阶段已经执行了它的一些任务 times.Spark 计算所有这些任务的累加器更新。
结论:未在结果中处理 Task.Accumulator 将给出错误的输出。 - 如果任务 运行ning 很慢,Spark 可以在另一个节点上启动该任务的推测副本。
结论:不是 handled.Accumulator 会给出错误的输出。 - 缓存的RDD很大,不能驻留在Memory.So中,每当使用RDD时,它将重新运行 Map操作来获取RDD,并再次更新累加器.
结论:不是 handled.Accumulator 会给出错误的输出。
所以它可能会发生相同的功能可能 运行 多次在同一个 data.So 由于 Map 操作,Spark 不为累加器更新提供任何保证。
所以在Spark的Action操作中最好使用Accumulator
要了解有关 Accumulator 及其问题的更多信息,请参阅此 Blog Post - Imran Rashid。