RX GroupByUntil 滑动直到
RX GroupByUntil with sliding Until
我正在使用 GroupByUntil 对来自 MSMQ 的消息进行分组,这些消息具有特定的 属性 值,效果非常好。我正在使用此代码。
observable.GroupByUntil(
message => message.Source,
message => message.Body,
message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration
).Subscribe(HandleGroup);
我错误地认为,每次给定组的新消息到达时,该组的 durationSelector 都会重新启动,本质上是等待持续时间过去,没有新消息,然后结束该组。我意识到情况并非如此,无论如何,durationSelector 都会继续倒计时。在分组时为每个组实现滑动 durationSelector 的最佳方法是什么?
Switch
是你的朋友。
observable.GroupByUntil(
message => message.Source,
message => message.Body,
group => group
.Select(message => Observable.Timer(new TimeSpan(0, 0, 5)))
.Switch()
).Subscribe(HandleGroup);
解释:
- 对于每条消息,创建一个在 5 秒后触发一次的计时器
- 如果同一组中出现另一条消息,则停止旧计时器并切换到新计时器。
我正在使用 GroupByUntil 对来自 MSMQ 的消息进行分组,这些消息具有特定的 属性 值,效果非常好。我正在使用此代码。
observable.GroupByUntil(
message => message.Source,
message => message.Body,
message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration
).Subscribe(HandleGroup);
我错误地认为,每次给定组的新消息到达时,该组的 durationSelector 都会重新启动,本质上是等待持续时间过去,没有新消息,然后结束该组。我意识到情况并非如此,无论如何,durationSelector 都会继续倒计时。在分组时为每个组实现滑动 durationSelector 的最佳方法是什么?
Switch
是你的朋友。
observable.GroupByUntil(
message => message.Source,
message => message.Body,
group => group
.Select(message => Observable.Timer(new TimeSpan(0, 0, 5)))
.Switch()
).Subscribe(HandleGroup);
解释:
- 对于每条消息,创建一个在 5 秒后触发一次的计时器
- 如果同一组中出现另一条消息,则停止旧计时器并切换到新计时器。