如果一个接一个超时,则配对信号

Pair signals if one after another with timeouts

以下代码使用 Reactive Extensions 工作,并在源一之后听到源二时组合两个源信号:

工作没有超时

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);

var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => sourceTwo.Delay(TimeSpan.FromMilliseconds(20)),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
        tuple.Item1.Seconds, tuple.Item2.Seconds));

输出:

Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43

如何引入超时

但是如果来源二在来源一之后到达得太晚,我需要排除任何对。

以下不起作用并产生不需要的输出:

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now-startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now-startTime);
var timeOuts = sourceOne.Delay(TimeSpan.FromSeconds(3.81));
var sourceTwoWithTimeouts = sourceTwo.Merge(timeOuts);
var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => sourceTwoWithTimeouts.Delay(TimeSpan.FromMilliseconds(20)),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}", 
        tuple.Item1.Seconds, tuple.Item2.Seconds));

不正确的输出,例如在 00:00:10 和 00:00:14 应该听不到任何声音,因为超时应该关闭左侧 window:

Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43

有什么问题或如何实现此类超时?

Join 是个乱七八糟的家伙。我不确定你是否正确使用它,而且 Rx 文档非常糟糕。我能找到的最好的两个来源是 MSDN video, and Lee Campbell's excellent site.

无论如何,我做了以下,这似乎有效,至少据我了解您的要求:

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => Observable.Return(span).Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
        tuple.Item1.Seconds, tuple.Item2.Seconds));

输出如下所示:

Ready, Steady, Go!
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:40 followed by Two at 00:00:43
Heard One at 00:00:50 followed by Two at 00:00:50
Heard One at 00:00:10 followed by Two at 00:00:12

说明:当使用 Join 时,您希望根据两个流在重叠 windows 中发出事件来加入它们。为此,我们需要 5 件事:

  1. 左可观察流
  2. 右可观察流
  3. Left Stream
  4. 中每个事件的 window 持续时间的定义
  5. 右流
  6. 中每个事件的 window 持续时间的定义
  7. 当Left Stream 和Right Stream 中有一对对象的开windows 重叠时,两个对象的选择器函数。

这是你对 Join 的五个论点。最令人困惑的部分是参数 3 和 4,leftDurationSelectorrightDurationSelector。当一个项目从左侧流发出时, window 被打开。 leftDurationSelector 产生一个流,当一个项目从该流发出,或者该流终止时,然后 window 关闭。发出的任何值都是无关紧要的并被忽略。流的唯一目的是定义 window.

的结束

所以在我上面的代码中,我将左侧 window 的关闭定义为左侧 window 释放后 3.81 秒。这听起来像你想要的。


那为什么你的代码不起作用?

您的代码等同于此(将 sourceTwoWithTimeouts 分解为 Join 调用以提高可读性):

Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
    sourceTwo,
    span => sourceOne.Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo).Delay(TimeSpan.FromMilliseconds(20)),
    span => Observable.Empty<TimeSpan>(),
    (span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
    tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
        tuple.Item1.Seconds, tuple.Item2.Seconds));

如果你去掉了.Delay(TimeSpan.FromMilliseconds(20))的调用,除了你调用的是sourceOne.而我调用的是span.之外,和我的一样。如果您尝试使用原始可观察对象作为持续时间选择器,Rx 会将其解释为 "close the window when the NEXT value (from stream left) arrives"。因此,您的持续时间选择器实际上在 NEXT 左值到达后 3.81 秒(在我们的场景中为 13.81 秒后)或右值后关闭。由于您希望它在当前值之后 3.81 秒后关闭,请使用 span.,而不是 sourceOne.