Spark Streaming:无状态重叠 windows 与保持状态

Spark Streaming: stateless overlapping windows vs. keeping state

在处理顺序的有限事件会话流时,选择无状态滑动-window 操作(例如 reduceByKeyAndWindow)与选择保持状态(例如通过 updateStateByKey 或新的 mapStateByKey)有哪些注意事项使用 Spark Streaming?

例如,考虑以下场景:

A wearable device tracks physical exercises performed by the wearer. The device automatically detects when an exercise starts, and emits a message; emits additional messages while the exercise is undergoing (e.g. heart rate); and finally, emits a message when the exercise is done.

期望的结果是每个练习会话的聚合记录流。即同一会话的所有事件应该聚合在一起(例如,这样每个会话都可以保存在一个数据库行中)。请注意,每个会话的长度都是有限的,但是来自多个设备的整个流是连续的。为方便起见,我们假设设备为每个锻炼会话生成一个 GUID。

我可以看到两种使用 Spark Streaming 处理此用例的方法:

  1. 使用非重叠 windows,并保持状态。每个 GUID 保存一个状态,所有事件都与之匹配。当新事件到达时,状态会更新(例如使用 mapWithState),如果事件是 "end of exercise session",将发出基于状态的聚合记录,并删除键。

  2. 使用重叠滑动windows,并且只保留第一个会话。假设长度为 2 且间隔为 1 的滑动 window(见下图)。还假设 window 长度是 2 X(最大可能的锻炼时间)。在每个 window 上,事件按 GUID 聚合,例如使用 reduceByKeyAndWindow。然后,转储在 window 的后半部分开始的所有会话,并发出剩余的会话。这使得每个事件只使用一次,并确保属于同一会话的所有事件都将聚合在一起。

方法 #2 的图表:

Only sessions starting in the areas marked with \\ will be emitted. 
-----------
|window 1 |
|\\|    |
-----------
     ----------
     |window 2 |
     |\\|    |  
     -----------
          ----------
          |window 3 |
          |\\|    |
          -----------

我看到的优缺点:

方法 #1 的计算成本较低,但需要保存和管理状态(例如,如果并发会话数增加,状态可能会比内存大)。但是,如果并发会话的最大数量是有限的,这可能不是问题。

方法 #2 的成本是后者的两倍(每个事件处理两次),并且延迟更高(最大锻炼时间的 2 倍),但更简单且易于管理,因为不保留任何状态。

处理此用例的最佳方法是什么 - 这些方法中的任何一种都是 "right" 方法,还是有更好的方法?

还有哪些 pros/cons 应该考虑?

通常没有正确的方法,每个方法都有权衡。因此,我会在组合中添加其他方法,并概述我对它们的优缺点的看法。所以你可以决定哪一个更适合你。

外部状态方法(方法 #3)

您可以在外部存储中累积事件的状态。 Cassandra 经常用于此。您可以分别处理最终事件和正在进行的事件,例如如下所示:

val stream = ...

val ongoingEventsStream = stream.filter(!isFinalEvent)
val finalEventsStream = stream.filter(isFinalEvent)

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }

trackStateByKey 方法(方法 #1.1)

它可能是您的最佳解决方案,因为它消除了 updateStateByKey 的缺点,但考虑到它刚刚作为 Spark 1.6 版本的一部分发布,它也可能有风险(因为某些原因它不是很宣传).如果您想了解更多信息,可以使用 link 作为起点

Pros/Cons

方法 #1 (updateStateByKey)

优点

  • 易于理解或解释(对团队其他成员、新人等)(主观)
  • 存储:更好地利用内存只存储最新的运动状态
  • 存储:将只保留正在进行的练习,并在完成后立即丢弃
  • 延迟仅受每个 micro-batch 处理的性能限制

缺点

  • 存储:如果键数(并发练习)很大,它可能不适合您的集群内存
  • 处理:它将运行状态图中每个键的updateState函数,因此如果并发练习的数量很大 - 性能会受到影响

方法 #2 (window)

虽然可以使用 windows 实现您的需求,但在您的场景中看起来明显不够自然。

优点

  • Processing 在某些情况下(取决于数据)可能比 updateStateByKey 更有效,因为 updateStateByKey 倾向于 运行 更新每个键,即使有没有实际更新

缺点

  • "maximal possible exercise time" - 这听起来像是一个巨大的风险 - 它可能是基于人类行为的相当任意的持续时间。有些人可能会忘记 "finish exercise"。也取决于锻炼的种类,但可能从几秒到几小时不等,当您想要较低的延迟进行快速锻炼时,同时又必须将延迟保持在可能存在的最长锻炼时尽可能高
  • 感觉更难向其他人解释它是如何工作的(主观)
  • 存储:必须将所有数据保存在 window 帧内,而不仅仅是最新的一个。也只有当 window 滑出这个时间段时才会释放内存,而不是在运动真正完成时。虽然如果您只保留最后两个时隙可能不会有太大差异 - 如果您尝试通过更频繁地滑动 window 来获得更大的灵活性,它将会增加。

方法#3(外部状态)

优点

  • 易于解释等(主观)
  • 纯流式处理方法,这意味着 spark 负责对每个单独的事件采取行动,而不是试图存储状态等(主观)
  • 存储:不受集群存储状态的内存限制-可以处理大量并发练习
  • Processing:状态只有在有实际更新时才会更新(与 updateStateByKey 不同)
  • 延迟类似于 updateStateByKey,仅受处理每个 micro-batch
  • 所需的时间限制

缺点

  • 架构中的额外组件(除非您已经使用 Cassandra 作为最终输出)
  • 处理:默认情况下比仅在 spark 中处理慢,因为不是 in-memory + 您需要通过网络传输数据
  • 你必须执行 exactly once 语义才能将数据输出到 cassandra(对于 foreachRDD 期间 worker 失败的情况)

建议的方法

我会尝试以下操作:

  • 在您的数据和集群上测试 updateStateByKey 方法
  • 查看内存消耗和处理是否可以接受,即使有大量并发练习(预计在高峰时段)
  • 回退到 Cassandra 以防万一

我认为第三种方法的另一个缺点是 RDD 不是按时间顺序接收的..考虑到 运行 它们在集群上..

ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }

还有检查点和驱动程序节点故障怎么办。在那种情况下,你会再次读取整个数据吗?很想知道你想如何处理这个问题?

我想也许 mapwithstate 是您考虑所有这些情况的更好方法..