在 Apache Flink 中手动更新状态的最佳方式是什么?

What's the best way to update the state manually in Apache Flink?

我在股票市场项目中使用 Apache Flink 来计算当前价格变化。公式是

 price_change = (current_price - previous_close_price) / previous_close_price

previous_close_price 是证券在前一天交易日的收盘价。每天开市前,我需要更新 previous_close_price

现在我想出了几个解决方案,但我不知道哪个是最好的。

  1. previous_close_price存储在redis中,每次计算时获取价格。更新价格既简单又灵活,但这种解决方案可能会降低性能。

  2. 将状态的 TTL 设置为 1 天。当旧状态过期时获取新状态。但它不灵活,因为 TTL 是硬编码的。

  3. Broadcast State Pattern。我不确定这个解决方案是否有效。

  4. 给flink发个特殊消息。当 flink 收到消息时,它会更新 previous_close_price.

欢迎提出任何建议。

我建议在 #4 上使用一个变体:

有两个来源,一个仅用于收盘价,另一个用于交易流。通过安全对这两个流进行键控,并将它们与 CoProcessFunction 连接起来。在 CoProcessFunction 中以键控状态存储 previous_close_price。

每天开市前,更新收盘价。

这可以使用 RichCoFlatMap 来完成,但我建议使用 CoProcessFunction,因为您可能希望使用辅助输出来报告错误(例如缺少 previous_close_price 的证券)。

至于其他方法:

  1. 我认为将 previous_close_price 数据保存在外部数据存储中没有任何优势。
  2. 我认为这不是很好。没有可用于触发新数据加载的钩子,而且只有在访问时才会清除状态。
  3. 这感觉不是广播状态的好用例,除非集群中的每个人都需要知道所有证券的收盘价。