使用 Flink 计算流中有状态实体的最新状态

Counting latest state of stateful entities in streaming with Flink

我尝试在 Flink 中创建我的第一个实时分析作业。该方法类似于 kappa 架构,所以我在 Kafka 上有我的原始数据,我们在其中收到任何实体状态的每次更改的消息。

所以消息的形式是:

(id,newStatus, timestamp)

我们想要计算每次 window 处于给定状态的项目数。所以输出应该是这样的形式:

(outputTimestamp, state1:count1,state2:count2 ...)

或同等学历。这些行应包含在任何给定时间处于给定状态的项目计数,其中与 Id 关联的状态是针对该 id 观察到的最新消息。在任何情况下都应该计算 id 的状态,即使事件比正在处理的事件早得多。所以所有计数的总和应该等于系统中观察到的不同 ID 的数量。接下来的步骤可能会在一段时间后忘记最终项目中的项目,但这不是现在的严格要求。

这个会写在elasticsearch上然后查询

我尝试了很多不同的路径,其中 none 完全满足了要求。使用滑动 window 我可以很容易地实现预期的行为,除了当滑动 window 的开始超过事件的时间戳时,它会丢失计数,如您所料。其他方法在处理积压时无法保持一致,因为我对键和时间戳做了一些技巧,但在一次处理所有数据时失败了。

所以我想知道,即使是在高层次上,我应该如何解决这个问题。它看起来像是一个相对常见的用例,但必须无限期保留给定 ID 的相关信息以正确计算实体的事实会产生很多问题。

我想我有办法解决你的问题:

给定 (id, state, time)DataStream 作为:

val stateUpdates: DataStream[(Long, Int, ts)] = ???

你推导出实际的状态变化如下:

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater) 

StateUpdater 是有状态的 FlatMapFunction。它有一个键控状态,用于存储每个 id 的最后状态。对于每个输入记录,它 returns 两个状态计数更新记录:(oldState, -1)(newState, +1)(oldState, -1) 记录确保减少先前状态的计数。

接下来汇总每个州的州计数变化 window:

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time)
  .keyBy(_._1) // key by state
  .timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling
  .apply(new SumReducer(), new YourWindowFunction()) 

SumReducer 对 cntUpdates 求和,YourWindowFunction 分配 window 的时间戳。此步骤汇总 window.

中每个状态的所有状态更改

最后,我们根据计数更新调整当前计数。

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater)

CountUpdater 是有状态的 MapFunction。它有一个键控状态,存储每个状态的当前计数。对于每条传入记录,都会调整计数并发出一条记录 (state, newCount, time)

现在您有一个流,其中包含每个州的新计数(每个州一条记录)。如果可能,您可以使用这些记录更新您的 Elasticsearch 索引。如果您需要收集给定时间内的所有状态计数,您可以使用 window.

请注意:该程序的状态大小取决于唯一 ID 的数量。如果 id space 增长非常快,这可能会导致问题。