我应该使用状态计算吗? Spark Streaming状态计算讲解
Should I use state computation? Spark Streaming state computation explanation
这是我的情况:我从不同的设备接收数据,这些设备有自己的签名、时间戳和标志。然后,我使用 foreachRDD
函数过滤文件中的 (flag==SAVE_VALUE)
,但前提是它满足以下条件:
(it is the first time I receive this signature)
OR
(I already have this signature && the timestamp is older than an hour)
在我进入本地环境之前,这意味着我要使用地图,我在其中存储所有 ID 和收到的最后一个时间戳。现在我想在 Spark 中移动这个逻辑。我应该怎么做?
我觉得这是有状态 Dstream 的情况,但我不能完全理解:
- 我应该如何在 Dstream 中存储类似地图的 rdd?或者如何创建单个“map RDD”
- 如何比较新到达的数据?
看看mapWithState()
,正是你想要的
在 StateSpecFunction
中,您可以确定在同一键的新值到达时是更新、保留还是删除当前状态。您可以访问当前状态和新状态,因此可以在两者之间进行任何类型的比较。
它还内置了对超时的支持,并且可以划分为多个执行器。
您可以通过在 mapWithState()
的 return 值上调用 stateSnapshots()
来访问全球地图。否则,return 值将由每批 StateSpecFunction
的 return 值决定。
mapWithState()
是在 Spark 1.6 中添加的,在此之前有一个类似的函数叫做 updateStateByKey()
,它的功能基本相同,但在更大的数据集上表现更差。
这是我的情况:我从不同的设备接收数据,这些设备有自己的签名、时间戳和标志。然后,我使用 foreachRDD
函数过滤文件中的 (flag==SAVE_VALUE)
,但前提是它满足以下条件:
(it is the first time I receive this signature)
OR
(I already have this signature && the timestamp is older than an hour)
在我进入本地环境之前,这意味着我要使用地图,我在其中存储所有 ID 和收到的最后一个时间戳。现在我想在 Spark 中移动这个逻辑。我应该怎么做?
我觉得这是有状态 Dstream 的情况,但我不能完全理解:
- 我应该如何在 Dstream 中存储类似地图的 rdd?或者如何创建单个“map RDD”
- 如何比较新到达的数据?
看看mapWithState()
,正是你想要的
在 StateSpecFunction
中,您可以确定在同一键的新值到达时是更新、保留还是删除当前状态。您可以访问当前状态和新状态,因此可以在两者之间进行任何类型的比较。
它还内置了对超时的支持,并且可以划分为多个执行器。
您可以通过在 mapWithState()
的 return 值上调用 stateSnapshots()
来访问全球地图。否则,return 值将由每批 StateSpecFunction
的 return 值决定。
mapWithState()
是在 Spark 1.6 中添加的,在此之前有一个类似的函数叫做 updateStateByKey()
,它的功能基本相同,但在更大的数据集上表现更差。