如果一个接一个超时,则配对信号
Pair signals if one after another with timeouts
以下代码使用 Reactive Extensions 工作,并在源一之后听到源二时组合两个源信号:
工作没有超时
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceTwo.Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
输出:
Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43
如何引入超时
但是如果来源二在来源一之后到达得太晚,我需要排除任何对。
以下不起作用并产生不需要的输出:
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now-startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now-startTime);
var timeOuts = sourceOne.Delay(TimeSpan.FromSeconds(3.81));
var sourceTwoWithTimeouts = sourceTwo.Merge(timeOuts);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceTwoWithTimeouts.Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
不正确的输出,例如在 00:00:10 和 00:00:14 应该听不到任何声音,因为超时应该关闭左侧 window:
Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43
有什么问题或如何实现此类超时?
Join
是个乱七八糟的家伙。我不确定你是否正确使用它,而且 Rx 文档非常糟糕。我能找到的最好的两个来源是 MSDN video, and Lee Campbell's excellent site.
无论如何,我做了以下,这似乎有效,至少据我了解您的要求:
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => Observable.Return(span).Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
输出如下所示:
Ready, Steady, Go!
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:40 followed by Two at 00:00:43
Heard One at 00:00:50 followed by Two at 00:00:50
Heard One at 00:00:10 followed by Two at 00:00:12
说明:当使用 Join
时,您希望根据两个流在重叠 windows 中发出事件来加入它们。为此,我们需要 5 件事:
- 左可观察流
- 右可观察流
- Left Stream
中每个事件的 window 持续时间的定义
- 右流
中每个事件的 window 持续时间的定义
- 当Left Stream 和Right Stream 中有一对对象的开windows 重叠时,两个对象的选择器函数。
这是你对 Join
的五个论点。最令人困惑的部分是参数 3 和 4,leftDurationSelector
和 rightDurationSelector
。当一个项目从左侧流发出时, window 被打开。 leftDurationSelector
产生一个流,当一个项目从该流发出,或者该流终止时,然后 window 关闭。发出的任何值都是无关紧要的并被忽略。流的唯一目的是定义 window.
的结束
所以在我上面的代码中,我将左侧 window 的关闭定义为左侧 window 释放后 3.81 秒。这听起来像你想要的。
那为什么你的代码不起作用?
您的代码等同于此(将 sourceTwoWithTimeouts
分解为 Join
调用以提高可读性):
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceOne.Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo).Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
如果你去掉了.Delay(TimeSpan.FromMilliseconds(20))
的调用,除了你调用的是sourceOne.
而我调用的是span.
之外,和我的一样。如果您尝试使用原始可观察对象作为持续时间选择器,Rx 会将其解释为 "close the window when the NEXT value (from stream left) arrives"。因此,您的持续时间选择器实际上在 NEXT 左值到达后 3.81 秒(在我们的场景中为 13.81 秒后)或右值后关闭。由于您希望它在当前值之后 3.81 秒后关闭,请使用 span.
,而不是 sourceOne.
。
以下代码使用 Reactive Extensions 工作,并在源一之后听到源二时组合两个源信号:
工作没有超时
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceTwo.Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
输出:
Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43
如何引入超时
但是如果来源二在来源一之后到达得太晚,我需要排除任何对。
以下不起作用并产生不需要的输出:
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now-startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now-startTime);
var timeOuts = sourceOne.Delay(TimeSpan.FromSeconds(3.81));
var sourceTwoWithTimeouts = sourceTwo.Merge(timeOuts);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceTwoWithTimeouts.Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
不正确的输出,例如在 00:00:10 和 00:00:14 应该听不到任何声音,因为超时应该关闭左侧 window:
Ready, Steady, Go!
Heard One at 00:00:10 followed by Two at 00:00:14
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:30 followed by Two at 00:00:36
Heard One at 00:00:40 followed by Two at 00:00:43
有什么问题或如何实现此类超时?
Join
是个乱七八糟的家伙。我不确定你是否正确使用它,而且 Rx 文档非常糟糕。我能找到的最好的两个来源是 MSDN video, and Lee Campbell's excellent site.
无论如何,我做了以下,这似乎有效,至少据我了解您的要求:
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => Observable.Return(span).Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
输出如下所示:
Ready, Steady, Go!
Heard One at 00:00:20 followed by Two at 00:00:21
Heard One at 00:00:40 followed by Two at 00:00:43
Heard One at 00:00:50 followed by Two at 00:00:50
Heard One at 00:00:10 followed by Two at 00:00:12
说明:当使用 Join
时,您希望根据两个流在重叠 windows 中发出事件来加入它们。为此,我们需要 5 件事:
- 左可观察流
- 右可观察流
- Left Stream 中每个事件的 window 持续时间的定义
- 右流 中每个事件的 window 持续时间的定义
- 当Left Stream 和Right Stream 中有一对对象的开windows 重叠时,两个对象的选择器函数。
这是你对 Join
的五个论点。最令人困惑的部分是参数 3 和 4,leftDurationSelector
和 rightDurationSelector
。当一个项目从左侧流发出时, window 被打开。 leftDurationSelector
产生一个流,当一个项目从该流发出,或者该流终止时,然后 window 关闭。发出的任何值都是无关紧要的并被忽略。流的唯一目的是定义 window.
所以在我上面的代码中,我将左侧 window 的关闭定义为左侧 window 释放后 3.81 秒。这听起来像你想要的。
那为什么你的代码不起作用?
您的代码等同于此(将 sourceTwoWithTimeouts
分解为 Join
调用以提高可读性):
Console.WriteLine("Ready, Steady, Go!");
var startTime = DateTime.Now;
var sourceOne = Observable.Interval(TimeSpan.FromSeconds(10)).Select(l => DateTime.Now - startTime);
var sourceTwo = Observable.Interval(TimeSpan.FromSeconds(7.21)).Select(l => DateTime.Now - startTime);
var pairingTwoStraightAfterOne = sourceOne.Join(
sourceTwo,
span => sourceOne.Delay(TimeSpan.FromSeconds(3.81)).Merge(sourceTwo).Delay(TimeSpan.FromMilliseconds(20)),
span => Observable.Empty<TimeSpan>(),
(span1, span2) => new Tuple<TimeSpan, TimeSpan>(span1, span2));
pairingTwoStraightAfterOne.Take(4).Subscribe(
tuple => Console.WriteLine("Heard One at 00:00:{0:00} followed by Two at 00:00:{1:00}",
tuple.Item1.Seconds, tuple.Item2.Seconds));
如果你去掉了.Delay(TimeSpan.FromMilliseconds(20))
的调用,除了你调用的是sourceOne.
而我调用的是span.
之外,和我的一样。如果您尝试使用原始可观察对象作为持续时间选择器,Rx 会将其解释为 "close the window when the NEXT value (from stream left) arrives"。因此,您的持续时间选择器实际上在 NEXT 左值到达后 3.81 秒(在我们的场景中为 13.81 秒后)或右值后关闭。由于您希望它在当前值之后 3.81 秒后关闭,请使用 span.
,而不是 sourceOne.
。