在 Apache Flink 中手动更新状态的最佳方式是什么?
What's the best way to update the state manually in Apache Flink?
我在股票市场项目中使用 Apache Flink 来计算当前价格变化。公式是
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price
是证券在前一天交易日的收盘价。每天开市前,我需要更新 previous_close_price
。
现在我想出了几个解决方案,但我不知道哪个是最好的。
将previous_close_price
存储在redis中,每次计算时获取价格。更新价格既简单又灵活,但这种解决方案可能会降低性能。
将状态的 TTL 设置为 1 天。当旧状态过期时获取新状态。但它不灵活,因为 TTL 是硬编码的。
Broadcast State Pattern。我不确定这个解决方案是否有效。
给flink发个特殊消息。当 flink 收到消息时,它会更新 previous_close_price
.
欢迎提出任何建议。
我建议在 #4 上使用一个变体:
有两个来源,一个仅用于收盘价,另一个用于交易流。通过安全对这两个流进行键控,并将它们与 CoProcessFunction 连接起来。在 CoProcessFunction 中以键控状态存储 previous_close_price。
每天开市前,更新收盘价。
这可以使用 RichCoFlatMap 来完成,但我建议使用 CoProcessFunction,因为您可能希望使用辅助输出来报告错误(例如缺少 previous_close_price 的证券)。
至于其他方法:
- 我认为将 previous_close_price 数据保存在外部数据存储中没有任何优势。
- 我认为这不是很好。没有可用于触发新数据加载的钩子,而且只有在访问时才会清除状态。
- 这感觉不是广播状态的好用例,除非集群中的每个人都需要知道所有证券的收盘价。
我在股票市场项目中使用 Apache Flink 来计算当前价格变化。公式是
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price
是证券在前一天交易日的收盘价。每天开市前,我需要更新 previous_close_price
。
现在我想出了几个解决方案,但我不知道哪个是最好的。
将
previous_close_price
存储在redis中,每次计算时获取价格。更新价格既简单又灵活,但这种解决方案可能会降低性能。将状态的 TTL 设置为 1 天。当旧状态过期时获取新状态。但它不灵活,因为 TTL 是硬编码的。
Broadcast State Pattern。我不确定这个解决方案是否有效。
给flink发个特殊消息。当 flink 收到消息时,它会更新
previous_close_price
.
欢迎提出任何建议。
我建议在 #4 上使用一个变体:
有两个来源,一个仅用于收盘价,另一个用于交易流。通过安全对这两个流进行键控,并将它们与 CoProcessFunction 连接起来。在 CoProcessFunction 中以键控状态存储 previous_close_price。
每天开市前,更新收盘价。
这可以使用 RichCoFlatMap 来完成,但我建议使用 CoProcessFunction,因为您可能希望使用辅助输出来报告错误(例如缺少 previous_close_price 的证券)。
至于其他方法:
- 我认为将 previous_close_price 数据保存在外部数据存储中没有任何优势。
- 我认为这不是很好。没有可用于触发新数据加载的钩子,而且只有在访问时才会清除状态。
- 这感觉不是广播状态的好用例,除非集群中的每个人都需要知道所有证券的收盘价。