如何匹配两个因果流中的事件以检测过度滞后

How to match events in two causal streams to detect excessive lagging

我有两个热 observable,分别是对网络服务器的请求流 Q 和来自服务器的回复流 R。回复总是按照请求的顺序发送,每个请求最终 都会收到一个回复​​。因此 RR1 中的第一个事件是对 QQ1,依此类推。我需要检测何时回复 Rn 花费的时间比定义的超时时间长,并发出此超时条件的信号。

  Q --1---2---------3------->  // Requests Q1, Q2...
  R ----1------------------->  // Replies
Out ------------------O-|>     // Oops: Reply R2 to Q2 did not arrive within time τ.
          |<----τ---->|

事件 QnRn 不包含任何识别信息(想想普通的无色圆形弹珠),以及图中的索引只是为了解释而引入的序号。

我好像解不开这个谜语。我尝试了下面的方法,但似乎我匹配 the latest request Qi to 最新的响应Rj。在示例 Q 中包含 5 个请求,间隔 500 毫秒,R 中的回复间隔 750 毫秒,从 200 毫秒开始,但只有 4 个(第 5 个无限期延迟)。代码没有检测到这一点,因为最后一个回复 R4latest 请求 Q 之后的 1000 毫秒设置超时内5(实际上是 200 毫秒)。

var Q = Observable.Interval(TimeSpan.FromMilliseconds(500)).Select(_ => Unit.Default)
                   .Take(5).Concat(Observable.Never<Unit>());
var R = Observable.Interval(TimeSpan.FromMilliseconds(750)).Select(_ => Unit.Default)
                   .Delay(TimeSpan.FromMilliseconds(200))
                   .Take(4).Concat(Observable.Never<Unit>());

var dq = Q.Select(v => Observable.Return(v).Delay(TimeSpan.FromMilliseconds(1000)));
var dr = Observable.Zip(Q, R, (_1,_2) => Observable.Never<Unit>());
Observable.Merge(dq, dr).Dump().Switch().Dump();

我相信您希望收到请求 4 已超时(3 秒到期,但 3.2 秒到达)以及请求 5 从未到达的通知

void Main()
{
    var scheduler = new TestScheduler();
    var requests = scheduler.CreateHotObservable<int>(
        ReactiveTest.OnNext(0500.Ms(), 1),
        ReactiveTest.OnNext(1000.Ms(), 2),
        ReactiveTest.OnNext(1500.Ms(), 3),
        ReactiveTest.OnNext(2000.Ms(), 4),
        ReactiveTest.OnNext(2500.Ms(), 5));
    var responses = scheduler.CreateHotObservable<Unit>(
        ReactiveTest.OnNext(0950.Ms(), Unit.Default),
        ReactiveTest.OnNext(1700.Ms(), Unit.Default),
        ReactiveTest.OnNext(2450.Ms(), Unit.Default),
        ReactiveTest.OnNext(3200.Ms(), Unit.Default));

    var expected = scheduler.CreateHotObservable<int>(
            ReactiveTest.OnNext(3000.Ms(), 4),
            ReactiveTest.OnNext(3500.Ms(), 5)
            );

    var observer = scheduler.CreateObserver<int>();

    var query = responses
        .Select((val, idx)=>idx)
        .Publish(responseIdxs =>
    {
        return requests.SelectMany((q, qIdx) =>
                Observable.Timer(TimeSpan.FromSeconds(1), scheduler)
                    .TakeUntil(responseIdxs.Where(rIdx => qIdx == rIdx))
                    .Select(_ => q));
    });

    query.Subscribe(observer);
    scheduler.Start();

    //This test passes
    ReactiveAssert.AreElementsEqual(
        expected.Messages,
        observer.Messages);

}

// Define other methods and classes here
public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}