Spark Streaming mapWithState 似乎定期重建完整状态
Spark Streaming mapWithState seems to rebuild complete state periodically
我正在开发一个 Scala (2.11) / Spark (1.6.1) 流项目,并使用 mapWithState()
来跟踪之前批次的已见数据。
状态分布在多个节点上的 20 个分区中,使用 StateSpec.function(trackStateFunc _).numPartitions(20)
创建。在这种状态下,我们只有几个键 (~100) 映射到 Sets
,最多有 ~160.000 个条目,这些条目在整个应用程序中不断增长。整个状态最多3GB
,可以由集群中的每个节点处理。在每个批次中,一些数据被添加到一个状态但直到过程结束才被删除,即大约 15 分钟。
在申请 UI 之后,每 10 个批次的处理时间与其他批次相比非常长。查看图片:
黄色字段表示处理时间长。
更详细的作业视图显示,在这些批次中,发生在某个时间点,恰好在所有 20 个分区都为 "skipped" 时。或者这就是 UI 所说的。
我对skipped
的理解是每个状态分区都是一个可能的任务,它不会被执行,因为它不需要重新计算。但是,我不明白为什么每个 Job 的 skips
数量不同,为什么最后一个 Job 需要这么多处理。无论状态大小如何,处理时间都会增加,它只会影响持续时间。
这是 mapWithState()
功能中的错误还是这种预期行为?底层数据结构是否需要某种重新洗牌,状态中的 Set
是否需要复制数据?或者它更有可能是我的应用程序中的缺陷?
Is this a bug in the mapWithState() functionality or is this intended
behaviour?
这是有意为之的行为。您看到的尖峰是因为您的数据在给定批次的末尾得到检查点。如果您注意到较长批次的时间,您会发现它持续每 100 秒发生一次。这是因为检查点时间是恒定的,并且是根据您的 batchDuration
计算的,除非您明确设置 DStream.checkpoint
间隔,这是您与数据源对话以读取批次乘以某个常数的频率。
这是来自MapWithStateDStream
的相关代码:
override def initialize(time: Time): Unit = {
if (checkpointDuration == null) {
checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
}
super.initialize(time)
}
其中 DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
是:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
这与您看到的行为完全一致,因为您的读取批持续时间是每 10 秒一次 => 10 * 10 = 100 秒。
这是正常的,这是使用 Spark 持久化状态的代价。你这边的优化可能是考虑如何最小化必须保存在内存中的状态的大小,以便尽可能快地进行此序列化。此外,确保数据分布在足够多的执行者中,以便状态在所有节点之间均匀分布。另外,我希望您打开了 Kryo Serialization 而不是默认的 Java 序列化,这可以给您带来有意义的性能提升。
除了已接受的答案(指出与检查点相关的序列化代价)之外,还有另一个鲜为人知的问题可能会导致尖峰行为:删除已删除状态的驱逐。
具体来说,'deleted' 或 'timed out' 状态不会立即从映射中删除,而是标记为删除,实际上仅在序列化过程中删除 [在 Spark 1.6.1 中,请参阅 writeObjectInternal()].
这有两个性能影响,每 10 个批次只发生一次:
- 遍历和删除过程是有代价的
- 如果您处理超时/删除事件流,例如将其保存到外部存储,所有 10 个批次的相关成本将仅在此时支付(而不是像人们预期的那样,在每个 RDD 上支付)
我正在开发一个 Scala (2.11) / Spark (1.6.1) 流项目,并使用 mapWithState()
来跟踪之前批次的已见数据。
状态分布在多个节点上的 20 个分区中,使用 StateSpec.function(trackStateFunc _).numPartitions(20)
创建。在这种状态下,我们只有几个键 (~100) 映射到 Sets
,最多有 ~160.000 个条目,这些条目在整个应用程序中不断增长。整个状态最多3GB
,可以由集群中的每个节点处理。在每个批次中,一些数据被添加到一个状态但直到过程结束才被删除,即大约 15 分钟。
在申请 UI 之后,每 10 个批次的处理时间与其他批次相比非常长。查看图片:
黄色字段表示处理时间长。
更详细的作业视图显示,在这些批次中,发生在某个时间点,恰好在所有 20 个分区都为 "skipped" 时。或者这就是 UI 所说的。
我对skipped
的理解是每个状态分区都是一个可能的任务,它不会被执行,因为它不需要重新计算。但是,我不明白为什么每个 Job 的 skips
数量不同,为什么最后一个 Job 需要这么多处理。无论状态大小如何,处理时间都会增加,它只会影响持续时间。
这是 mapWithState()
功能中的错误还是这种预期行为?底层数据结构是否需要某种重新洗牌,状态中的 Set
是否需要复制数据?或者它更有可能是我的应用程序中的缺陷?
Is this a bug in the mapWithState() functionality or is this intended behaviour?
这是有意为之的行为。您看到的尖峰是因为您的数据在给定批次的末尾得到检查点。如果您注意到较长批次的时间,您会发现它持续每 100 秒发生一次。这是因为检查点时间是恒定的,并且是根据您的 batchDuration
计算的,除非您明确设置 DStream.checkpoint
间隔,这是您与数据源对话以读取批次乘以某个常数的频率。
这是来自MapWithStateDStream
的相关代码:
override def initialize(time: Time): Unit = {
if (checkpointDuration == null) {
checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
}
super.initialize(time)
}
其中 DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
是:
private[streaming] object InternalMapWithStateDStream {
private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}
这与您看到的行为完全一致,因为您的读取批持续时间是每 10 秒一次 => 10 * 10 = 100 秒。
这是正常的,这是使用 Spark 持久化状态的代价。你这边的优化可能是考虑如何最小化必须保存在内存中的状态的大小,以便尽可能快地进行此序列化。此外,确保数据分布在足够多的执行者中,以便状态在所有节点之间均匀分布。另外,我希望您打开了 Kryo Serialization 而不是默认的 Java 序列化,这可以给您带来有意义的性能提升。
除了已接受的答案(指出与检查点相关的序列化代价)之外,还有另一个鲜为人知的问题可能会导致尖峰行为:删除已删除状态的驱逐。
具体来说,'deleted' 或 'timed out' 状态不会立即从映射中删除,而是标记为删除,实际上仅在序列化过程中删除 [在 Spark 1.6.1 中,请参阅 writeObjectInternal()].
这有两个性能影响,每 10 个批次只发生一次:
- 遍历和删除过程是有代价的
- 如果您处理超时/删除事件流,例如将其保存到外部存储,所有 10 个批次的相关成本将仅在此时支付(而不是像人们预期的那样,在每个 RDD 上支付)