Observable.Join 例子

Observable.Join example

有人可以提供一个示例,说明如何将 Observable.Join 与两种不同的可观察类型一起使用吗?
found so far here 最好的解释是左边的 observable 打开一个 window,而右边的是在这个 window 中找到一个匹配项,这是有道理的。但是 leftDurationSelectorrightDurationSelector 是如何工作的呢?该示例仅使用 Publish().RefCount() ,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区 window.

这是一个我认为很好地展示了发生了什么的例子。

让我们从生成两个带有时间戳的可观察对象 sourceAsourceB 开始,每个对象在 010.0 秒之间的随机间隔内生成一个值:

Random rnd = new Random();
IObservable<int> source =
    Observable
        .Generate(0, x => true, x => x + 1, x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble()* 10.0));

IObservable<Timestamped<string>> sourceA = source.Select(x => $"A{x}").Timestamp();
IObservable<Timestamped<string>> sourceB = source.Select(x => $"B{x}").Timestamp();

让我们有一个很好的方法来输出值:

Func<Timestamped<string>, string> format =
    x => $"{x.Value} @ {x.Timestamp.LocalDateTime.ToString("HH:mm:ss.f")}";

例如,调用 format(x) 可能 return "A1 @ 12:33:09.2"

现在加入:

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");

如果我订阅这个查询,我会得到这样的结果:

A1 @ 12:33:09.2, B0 @ 12:33:12.4 
A3 @ 12:33:23.8, B1 @ 12:33:20.9 
A3 @ 12:33:23.8, B2 @ 12:33:25.6 
A3 @ 12:33:23.8, B3 @ 12:33:25.9 
A4 @ 12:33:30.6, B4 @ 12:33:33.0 
A5 @ 12:33:37.9, B5 @ 12:33:35.7 
A5 @ 12:33:37.9, B6 @ 12:33:40.3 
A5 @ 12:33:37.9, B7 @ 12:33:40.8 
A6 @ 12:33:43.3, B6 @ 12:33:40.3 
A6 @ 12:33:43.3, B7 @ 12:33:40.8 
A7 @ 12:33:44.5, B7 @ 12:33:40.8 
A7 @ 12:33:44.5, B8 @ 12:33:44.9 
A6 @ 12:33:43.3, B8 @ 12:33:44.9 

我从 sourceAsourceB 得到配对结果,其中一个值是在另一个值的 4.0 秒内产生的。

现在,如果我只想要来自 sourceB 的值,只要它仅在 sourceA 的值之后 4.0 秒生成,而不是相反呢?

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(0.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");

给我:

A0 @ 12:41:35.4, B1 @ 12:41:35.5 
A1 @ 12:41:42.7, B2 @ 12:41:43.2 
A3 @ 12:41:47.6, B3 @ 12:41:51.0 
A4 @ 12:41:49.8, B3 @ 12:41:51.0 
A7 @ 12:42:00.2, B4 @ 12:42:00.4 
A7 @ 12:42:00.2, B5 @ 12:42:02.5 
A7 @ 12:42:00.2, B6 @ 12:42:03.5 
A8 @ 12:42:04.8, B7 @ 12:42:06.4 
A9 @ 12:42:12.3, B8 @ 12:42:15.2 
A10 @ 12:42:17.2, B9 @ 12:42:19.7 
A11 @ 12:42:19.4, B9 @ 12:42:19.7 

请注意,所有 "B" 都出现在 "A" 之后。

或者我可以反过来做 - sourceA 的值在 sourceB 之后但在 4.0 秒内产生。

IObservable<string> query =
    sourceA.Join(sourceB,
        a => Observable.Timer(TimeSpan.FromSeconds(0.0)),
        b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
        (at, bt) => $"{format(at)}, {format(bt)}");

这给了我:

A1 @ 12:43:23.8, B0 @ 12:43:21.4 
A1 @ 12:43:23.8, B1 @ 12:43:22.8 
A2 @ 12:43:27.9, B2 @ 12:43:27.3 
A3 @ 12:43:33.6, B3 @ 12:43:29.6 
A4 @ 12:43:36.2, B4 @ 12:43:35.3 
A4 @ 12:43:36.2, B5 @ 12:43:35.5 
A4 @ 12:43:36.2, B6 @ 12:43:35.9 
A5 @ 12:43:43.4, B9 @ 12:43:43.1 
A5 @ 12:43:43.4, B8 @ 12:43:40.6 
A6 @ 12:43:46.5, B10 @ 12:43:43.8 
A6 @ 12:43:46.5, B9 @ 12:43:43.1 
A8 @ 12:43:55.5, B12 @ 12:43:52.4 
A9 @ 12:44:03.8, B13 @ 12:44:01.2 
A12 @ 12:44:09.7, B14 @ 12:44:08.9 
A13 @ 12:44:12.8, B14 @ 12:44:08.9 
A14 @ 12:44:16.0, B15 @ 12:44:13.3 

请注意,所有 "A" 都出现在 "B" 之后。

您还可以使用查询语法:

var query =
    from a in sourceA
    join b in sourceB
        on Observable.Timer(TimeSpan.FromSeconds(4.0))
        equals Observable.Timer(TimeSpan.FromSeconds(4.0))
    select $"{format(a)}, {format(b)}";