在 Storm 中翻滚 window,同时包含计数和持续时间
Tumbling window in Storm with both Count and Duration
如何在 Storm 中创建 Tumbling window 具有两个阈值。例如,如果我将 WindowCount 设置为 500 并将 WindowDuration 设置为 5 秒,即使消息少于 500 条但已过去 5 秒,window 也应该得到处理。我可以看到这两种功能的独立 API
计数
.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
时间
.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
我可以同时使用两者吗?
如果我按 MessageCount 而不是 Duration 配置,当我停止拓扑时我的消息会发生什么情况?即使未收到批计数,Storm 是否会处理这些消息?或者我会丢失这些消息吗?
我不相信你可以用当前的 windowing API 做到这一点。
代码具有足够的可插拔性,可以在内部使用它,但您需要的 API 不会公开。有两个接口来定义如何处理 windows。
TriggerPolicy 决定何时将 windows 传递给螺栓(例如 "deliver when 100 tuples have been buffered")。
EvictionPolicy 决定何时从当前 window 中逐出元组(例如 "discard tuples once they're more than 500 tuples behind the newest tuple in the window")。
您通过例如间接配置这些政策BaseWindowedBolt.withWindowLength, which internally just sets some configuration properties. Those properties are used to determine the trigger/eviction policy in WindowedBoltExecutor.
我认为需要的是允许用户提供他们自己的自定义 TriggerPolicy/EvictionPolicy,或者添加一个新的 Trigger/EvictionPolicy 来满足您的需求。
如果您想为此提出问题,可以在 https://issues.apache.org/jira/projects/STORM. If you would like to contribute code, the source is available at https://github.com/apache/storm 上提出,您也可以在那里提出 PR。
如何在 Storm 中创建 Tumbling window 具有两个阈值。例如,如果我将 WindowCount 设置为 500 并将 WindowDuration 设置为 5 秒,即使消息少于 500 条但已过去 5 秒,window 也应该得到处理。我可以看到这两种功能的独立 API
计数
.tumblingWindow(1000, windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
时间
.tumblingWindow(Duration.seconds(5), windowStoreFactory, new Fields("word"), new CountAsAggregator(), new Fields("count"))
我可以同时使用两者吗?
如果我按 MessageCount 而不是 Duration 配置,当我停止拓扑时我的消息会发生什么情况?即使未收到批计数,Storm 是否会处理这些消息?或者我会丢失这些消息吗?
我不相信你可以用当前的 windowing API 做到这一点。
代码具有足够的可插拔性,可以在内部使用它,但您需要的 API 不会公开。有两个接口来定义如何处理 windows。
TriggerPolicy 决定何时将 windows 传递给螺栓(例如 "deliver when 100 tuples have been buffered")。
EvictionPolicy 决定何时从当前 window 中逐出元组(例如 "discard tuples once they're more than 500 tuples behind the newest tuple in the window")。
您通过例如间接配置这些政策BaseWindowedBolt.withWindowLength, which internally just sets some configuration properties. Those properties are used to determine the trigger/eviction policy in WindowedBoltExecutor.
我认为需要的是允许用户提供他们自己的自定义 TriggerPolicy/EvictionPolicy,或者添加一个新的 Trigger/EvictionPolicy 来满足您的需求。
如果您想为此提出问题,可以在 https://issues.apache.org/jira/projects/STORM. If you would like to contribute code, the source is available at https://github.com/apache/storm 上提出,您也可以在那里提出 PR。