使用 mapGroupsWithState 进行任意状态处理的状态保存在哪里?
Where is the state saved for arbitrary state processing using mapGroupsWithState?
我在流式数据集上使用 mapGroupsWithState
来维护跨批次的状态。这个 data/state 存储在哪里?执行者,driver 还是其他地方?
but I am not sure where this data/state is stored? (Executor or driver)
状态保存到 [checkpointLocation]/state
,应该在可靠的 HDFS 兼容分布式文件系统上,以便执行者(和任务)可以在需要时访问它。
这给出了 [checkpointLocation]/state
.
可能有许多有状态的运算符,每个运算符都有自己的 operatorId
用于存储特定于运算符的状态。这就是为什么每个有状态运算符可能有零个、一个或多个状态子目录。
这给出了 [checkpointLocation]/state/[operatorId]
.
分区的有状态操作符特定状态目录中还有更多子目录。
这给出了以下特定于州的目录布局:
[checkpointLocation]/state/[operatorId]/[partitionId]
使用 web UI 找出 checkpointLocation
、operatorId
和分区数。
有状态运算符的状态是使用 StateStoreRestoreExec
一元物理运算符从 [checkpointLocation]/state
重新创建的(使用 explain
找到它)。 StateStoreRestoreExec
从状态存储中恢复(读取)流状态以获取子物理运算符提供的密钥。我的理解是每个微批次都会重新创建状态。
我在流式数据集上使用 mapGroupsWithState
来维护跨批次的状态。这个 data/state 存储在哪里?执行者,driver 还是其他地方?
but I am not sure where this data/state is stored? (Executor or driver)
状态保存到 [checkpointLocation]/state
,应该在可靠的 HDFS 兼容分布式文件系统上,以便执行者(和任务)可以在需要时访问它。
这给出了 [checkpointLocation]/state
.
可能有许多有状态的运算符,每个运算符都有自己的 operatorId
用于存储特定于运算符的状态。这就是为什么每个有状态运算符可能有零个、一个或多个状态子目录。
这给出了 [checkpointLocation]/state/[operatorId]
.
分区的有状态操作符特定状态目录中还有更多子目录。
这给出了以下特定于州的目录布局:
[checkpointLocation]/state/[operatorId]/[partitionId]
使用 web UI 找出 checkpointLocation
、operatorId
和分区数。
有状态运算符的状态是使用 StateStoreRestoreExec
一元物理运算符从 [checkpointLocation]/state
重新创建的(使用 explain
找到它)。 StateStoreRestoreExec
从状态存储中恢复(读取)流状态以获取子物理运算符提供的密钥。我的理解是每个微批次都会重新创建状态。