Observable.Join 例子
Observable.Join example
有人可以提供一个示例,说明如何将 Observable.Join 与两种不同的可观察类型一起使用吗?
我 found so far here 最好的解释是左边的 observable 打开一个 window,而右边的是在这个 window 中找到一个匹配项,这是有道理的。但是 leftDurationSelector
和 rightDurationSelector
是如何工作的呢?该示例仅使用 Publish().RefCount()
,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区 window.
这是一个我认为很好地展示了发生了什么的例子。
让我们从生成两个带有时间戳的可观察对象 sourceA
和 sourceB
开始,每个对象在 0
和 10.0
秒之间的随机间隔内生成一个值:
Random rnd = new Random();
IObservable<int> source =
Observable
.Generate(0, x => true, x => x + 1, x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble()* 10.0));
IObservable<Timestamped<string>> sourceA = source.Select(x => $"A{x}").Timestamp();
IObservable<Timestamped<string>> sourceB = source.Select(x => $"B{x}").Timestamp();
让我们有一个很好的方法来输出值:
Func<Timestamped<string>, string> format =
x => $"{x.Value} @ {x.Timestamp.LocalDateTime.ToString("HH:mm:ss.f")}";
例如,调用 format(x)
可能 return "A1 @ 12:33:09.2"
。
现在加入:
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
如果我订阅这个查询,我会得到这样的结果:
A1 @ 12:33:09.2, B0 @ 12:33:12.4
A3 @ 12:33:23.8, B1 @ 12:33:20.9
A3 @ 12:33:23.8, B2 @ 12:33:25.6
A3 @ 12:33:23.8, B3 @ 12:33:25.9
A4 @ 12:33:30.6, B4 @ 12:33:33.0
A5 @ 12:33:37.9, B5 @ 12:33:35.7
A5 @ 12:33:37.9, B6 @ 12:33:40.3
A5 @ 12:33:37.9, B7 @ 12:33:40.8
A6 @ 12:33:43.3, B6 @ 12:33:40.3
A6 @ 12:33:43.3, B7 @ 12:33:40.8
A7 @ 12:33:44.5, B7 @ 12:33:40.8
A7 @ 12:33:44.5, B8 @ 12:33:44.9
A6 @ 12:33:43.3, B8 @ 12:33:44.9
我从 sourceA
和 sourceB
得到配对结果,其中一个值是在另一个值的 4.0
秒内产生的。
现在,如果我只想要来自 sourceB
的值,只要它仅在 sourceA
的值之后 4.0
秒生成,而不是相反呢?
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
b => Observable.Timer(TimeSpan.FromSeconds(0.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
给我:
A0 @ 12:41:35.4, B1 @ 12:41:35.5
A1 @ 12:41:42.7, B2 @ 12:41:43.2
A3 @ 12:41:47.6, B3 @ 12:41:51.0
A4 @ 12:41:49.8, B3 @ 12:41:51.0
A7 @ 12:42:00.2, B4 @ 12:42:00.4
A7 @ 12:42:00.2, B5 @ 12:42:02.5
A7 @ 12:42:00.2, B6 @ 12:42:03.5
A8 @ 12:42:04.8, B7 @ 12:42:06.4
A9 @ 12:42:12.3, B8 @ 12:42:15.2
A10 @ 12:42:17.2, B9 @ 12:42:19.7
A11 @ 12:42:19.4, B9 @ 12:42:19.7
请注意,所有 "B" 都出现在 "A" 之后。
或者我可以反过来做 - sourceA
的值在 sourceB
之后但在 4.0
秒内产生。
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(0.0)),
b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
这给了我:
A1 @ 12:43:23.8, B0 @ 12:43:21.4
A1 @ 12:43:23.8, B1 @ 12:43:22.8
A2 @ 12:43:27.9, B2 @ 12:43:27.3
A3 @ 12:43:33.6, B3 @ 12:43:29.6
A4 @ 12:43:36.2, B4 @ 12:43:35.3
A4 @ 12:43:36.2, B5 @ 12:43:35.5
A4 @ 12:43:36.2, B6 @ 12:43:35.9
A5 @ 12:43:43.4, B9 @ 12:43:43.1
A5 @ 12:43:43.4, B8 @ 12:43:40.6
A6 @ 12:43:46.5, B10 @ 12:43:43.8
A6 @ 12:43:46.5, B9 @ 12:43:43.1
A8 @ 12:43:55.5, B12 @ 12:43:52.4
A9 @ 12:44:03.8, B13 @ 12:44:01.2
A12 @ 12:44:09.7, B14 @ 12:44:08.9
A13 @ 12:44:12.8, B14 @ 12:44:08.9
A14 @ 12:44:16.0, B15 @ 12:44:13.3
请注意,所有 "A" 都出现在 "B" 之后。
您还可以使用查询语法:
var query =
from a in sourceA
join b in sourceB
on Observable.Timer(TimeSpan.FromSeconds(4.0))
equals Observable.Timer(TimeSpan.FromSeconds(4.0))
select $"{format(a)}, {format(b)}";
有人可以提供一个示例,说明如何将 Observable.Join 与两种不同的可观察类型一起使用吗?
我 found so far here 最好的解释是左边的 observable 打开一个 window,而右边的是在这个 window 中找到一个匹配项,这是有道理的。但是 leftDurationSelector
和 rightDurationSelector
是如何工作的呢?该示例仅使用 Publish().RefCount()
,我不明白它的作用。对我来说,持续时间选择器听起来像是一个时间跨度,例如缓冲区 window.
这是一个我认为很好地展示了发生了什么的例子。
让我们从生成两个带有时间戳的可观察对象 sourceA
和 sourceB
开始,每个对象在 0
和 10.0
秒之间的随机间隔内生成一个值:
Random rnd = new Random();
IObservable<int> source =
Observable
.Generate(0, x => true, x => x + 1, x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble()* 10.0));
IObservable<Timestamped<string>> sourceA = source.Select(x => $"A{x}").Timestamp();
IObservable<Timestamped<string>> sourceB = source.Select(x => $"B{x}").Timestamp();
让我们有一个很好的方法来输出值:
Func<Timestamped<string>, string> format =
x => $"{x.Value} @ {x.Timestamp.LocalDateTime.ToString("HH:mm:ss.f")}";
例如,调用 format(x)
可能 return "A1 @ 12:33:09.2"
。
现在加入:
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
如果我订阅这个查询,我会得到这样的结果:
A1 @ 12:33:09.2, B0 @ 12:33:12.4 A3 @ 12:33:23.8, B1 @ 12:33:20.9 A3 @ 12:33:23.8, B2 @ 12:33:25.6 A3 @ 12:33:23.8, B3 @ 12:33:25.9 A4 @ 12:33:30.6, B4 @ 12:33:33.0 A5 @ 12:33:37.9, B5 @ 12:33:35.7 A5 @ 12:33:37.9, B6 @ 12:33:40.3 A5 @ 12:33:37.9, B7 @ 12:33:40.8 A6 @ 12:33:43.3, B6 @ 12:33:40.3 A6 @ 12:33:43.3, B7 @ 12:33:40.8 A7 @ 12:33:44.5, B7 @ 12:33:40.8 A7 @ 12:33:44.5, B8 @ 12:33:44.9 A6 @ 12:33:43.3, B8 @ 12:33:44.9
我从 sourceA
和 sourceB
得到配对结果,其中一个值是在另一个值的 4.0
秒内产生的。
现在,如果我只想要来自 sourceB
的值,只要它仅在 sourceA
的值之后 4.0
秒生成,而不是相反呢?
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(4.0)),
b => Observable.Timer(TimeSpan.FromSeconds(0.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
给我:
A0 @ 12:41:35.4, B1 @ 12:41:35.5 A1 @ 12:41:42.7, B2 @ 12:41:43.2 A3 @ 12:41:47.6, B3 @ 12:41:51.0 A4 @ 12:41:49.8, B3 @ 12:41:51.0 A7 @ 12:42:00.2, B4 @ 12:42:00.4 A7 @ 12:42:00.2, B5 @ 12:42:02.5 A7 @ 12:42:00.2, B6 @ 12:42:03.5 A8 @ 12:42:04.8, B7 @ 12:42:06.4 A9 @ 12:42:12.3, B8 @ 12:42:15.2 A10 @ 12:42:17.2, B9 @ 12:42:19.7 A11 @ 12:42:19.4, B9 @ 12:42:19.7
请注意,所有 "B" 都出现在 "A" 之后。
或者我可以反过来做 - sourceA
的值在 sourceB
之后但在 4.0
秒内产生。
IObservable<string> query =
sourceA.Join(sourceB,
a => Observable.Timer(TimeSpan.FromSeconds(0.0)),
b => Observable.Timer(TimeSpan.FromSeconds(4.0)),
(at, bt) => $"{format(at)}, {format(bt)}");
这给了我:
A1 @ 12:43:23.8, B0 @ 12:43:21.4 A1 @ 12:43:23.8, B1 @ 12:43:22.8 A2 @ 12:43:27.9, B2 @ 12:43:27.3 A3 @ 12:43:33.6, B3 @ 12:43:29.6 A4 @ 12:43:36.2, B4 @ 12:43:35.3 A4 @ 12:43:36.2, B5 @ 12:43:35.5 A4 @ 12:43:36.2, B6 @ 12:43:35.9 A5 @ 12:43:43.4, B9 @ 12:43:43.1 A5 @ 12:43:43.4, B8 @ 12:43:40.6 A6 @ 12:43:46.5, B10 @ 12:43:43.8 A6 @ 12:43:46.5, B9 @ 12:43:43.1 A8 @ 12:43:55.5, B12 @ 12:43:52.4 A9 @ 12:44:03.8, B13 @ 12:44:01.2 A12 @ 12:44:09.7, B14 @ 12:44:08.9 A13 @ 12:44:12.8, B14 @ 12:44:08.9 A14 @ 12:44:16.0, B15 @ 12:44:13.3
请注意,所有 "A" 都出现在 "B" 之后。
您还可以使用查询语法:
var query =
from a in sourceA
join b in sourceB
on Observable.Timer(TimeSpan.FromSeconds(4.0))
equals Observable.Timer(TimeSpan.FromSeconds(4.0))
select $"{format(a)}, {format(b)}";