有状态结构化 Spark Streaming:未触发超时
Stateful Structured Spark Streaming: Timeout is not getting triggered
我已将超时持续时间设置为“2 分钟”,如下所示:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
oldState: GroupState[MyState]): OutputRow = {
println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)
if (oldState.hasTimedOut) {
println("@@@@@ oldState has timed out @@@@")
// Logic to Write OutputRow
OutputRow("some values here...")
} else {
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
oldState.setTimeoutDuration("2 minutes")
}
OutputRow(null, null, null)
}
}
我还在 'mapGroupsWithState' 中指定了 ProcessingTimeTimeout 如下...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
但是 'hasTimedOut' 永远不会为真,所以我没有得到任何输出!我做错了什么?
似乎只有在输入数据连续流动时才有效。我已经停止了输入作业,因为我有足够的数据,但似乎只有在连续输入数据时超时才会起作用。不确定为什么要这样设计。使编写 unit/integration 测试变得有点困难,但我确信它以这种方式设计是有原因的。谢谢。
我已将超时持续时间设置为“2 分钟”,如下所示:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
oldState: GroupState[MyState]): OutputRow = {
println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)
if (oldState.hasTimedOut) {
println("@@@@@ oldState has timed out @@@@")
// Logic to Write OutputRow
OutputRow("some values here...")
} else {
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
oldState.setTimeoutDuration("2 minutes")
}
OutputRow(null, null, null)
}
}
我还在 'mapGroupsWithState' 中指定了 ProcessingTimeTimeout 如下...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
但是 'hasTimedOut' 永远不会为真,所以我没有得到任何输出!我做错了什么?
似乎只有在输入数据连续流动时才有效。我已经停止了输入作业,因为我有足够的数据,但似乎只有在连续输入数据时超时才会起作用。不确定为什么要这样设计。使编写 unit/integration 测试变得有点困难,但我确信它以这种方式设计是有原因的。谢谢。