响应式扩展:缓冲直到其他流完成
Reactive Extensions: Buffer until other stream completes
我发出请求并等待响应流上对该请求的响应。在等待期间,更新流中可能会有更新。
我希望在响应流完成时获得这些更新。像这样:
Response (cold) | x y z |
Updates (hot) | 1 2 3 4 |
Result | x y z 123 4 |
我好像不太对劲。是否有一些巧妙的运算符组合可以满足我的需求?
这会奏效,尽管 one-liner 不能做到这一点让我很困扰:
var updatesReplayed = updates.Replay();
updatesReplayed.Connect();
var results = response.Concat(updatesReplayed);
下面是演示功能的测试:
// time | 1 2 3 4 5 6 7 8 9
// Response(cold) | x y z |
// Updates(hot) | 1 2 3 4 |
// Result | x y z 123 4 |
var scheduler = new TestScheduler();
var updates = scheduler.CreateHotObservable(
ReactiveTest.OnNext(100.ToMsTicks(), "1"),
ReactiveTest.OnNext(200.ToMsTicks(), "2"),
ReactiveTest.OnNext(400.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var response = scheduler.CreateColdObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnCompleted<string>(600.ToMsTicks())
);
var expectedResults = scheduler.CreateHotObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnNext(600.ToMsTicks(), "1"),
ReactiveTest.OnNext(600.ToMsTicks(), "2"),
ReactiveTest.OnNext(600.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var replayed = updates.Replay();
replayed.Connect();
var results = response.Concat(replayed);
var observer = scheduler.CreateObserver<string>();
results.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
来自问题的图表:
缓冲所有更新,直到响应完成,然后显示其余更新。
回复立即出现。
updates.TakeUntil(response.LastAsync())
.ToArray()
.SelectMany(x => x)
.Concat(updates)
.Merge(response);
我发出请求并等待响应流上对该请求的响应。在等待期间,更新流中可能会有更新。
我希望在响应流完成时获得这些更新。像这样:
Response (cold) | x y z |
Updates (hot) | 1 2 3 4 |
Result | x y z 123 4 |
我好像不太对劲。是否有一些巧妙的运算符组合可以满足我的需求?
这会奏效,尽管 one-liner 不能做到这一点让我很困扰:
var updatesReplayed = updates.Replay();
updatesReplayed.Connect();
var results = response.Concat(updatesReplayed);
下面是演示功能的测试:
// time | 1 2 3 4 5 6 7 8 9
// Response(cold) | x y z |
// Updates(hot) | 1 2 3 4 |
// Result | x y z 123 4 |
var scheduler = new TestScheduler();
var updates = scheduler.CreateHotObservable(
ReactiveTest.OnNext(100.ToMsTicks(), "1"),
ReactiveTest.OnNext(200.ToMsTicks(), "2"),
ReactiveTest.OnNext(400.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var response = scheduler.CreateColdObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnCompleted<string>(600.ToMsTicks())
);
var expectedResults = scheduler.CreateHotObservable(
ReactiveTest.OnNext(300.ToMsTicks(), "x"),
ReactiveTest.OnNext(400.ToMsTicks(), "y"),
ReactiveTest.OnNext(500.ToMsTicks(), "z"),
ReactiveTest.OnNext(600.ToMsTicks(), "1"),
ReactiveTest.OnNext(600.ToMsTicks(), "2"),
ReactiveTest.OnNext(600.ToMsTicks(), "3"),
ReactiveTest.OnNext(800.ToMsTicks(), "4"),
ReactiveTest.OnCompleted<string>(900.ToMsTicks())
);
var replayed = updates.Replay();
replayed.Connect();
var results = response.Concat(replayed);
var observer = scheduler.CreateObserver<string>();
results.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);
来自问题的图表:
缓冲所有更新,直到响应完成,然后显示其余更新。 回复立即出现。
updates.TakeUntil(response.LastAsync())
.ToArray()
.SelectMany(x => x)
.Concat(updates)
.Merge(response);