RX GroupByUntil 滑动直到

RX GroupByUntil with sliding Until

我正在使用 GroupByUntil 对来自 MSMQ 的消息进行分组,这些消息具有特定的 属性 值,效果非常好。我正在使用此代码。

observable.GroupByUntil(
    message => message.Source,
    message => message.Body,
    message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration
).Subscribe(HandleGroup);

我错误地认为,每次给定组的新消息到达时,该组的 durationSelector 都会重新启动,本质上是等待持续时间过去,没有新消息,然后结束该组。我意识到情况并非如此,无论如何,durationSelector 都会继续倒计时。在分组时为每个组实现滑动 durationSelector 的最佳方法是什么?

Switch是你的朋友。

observable.GroupByUntil(
    message => message.Source,
    message => message.Body,
    group => group
        .Select(message => Observable.Timer(new TimeSpan(0, 0, 5)))
        .Switch() 
).Subscribe(HandleGroup);

解释:

  • 对于每条消息,创建一个在 5 秒后触发一次的计时器
  • 如果同一组中出现另一条消息,则停止旧计时器并切换到新计时器。