响应式扩展:缓冲直到其他流完成

Reactive Extensions: Buffer until other stream completes

我发出请求并等待响应流上对该请求的响应。在等待期间,更新流中可能会有更新。

我希望在响应流完成时获得这些更新。像这样:

Response (cold)   |      x y z |
Updates (hot)     | 1 2    3        4 |

Result            |      x y z 123  4 |

我好像不太对劲。是否有一些巧妙的运算符组合可以满足我的需求?

这会奏效,尽管 one-liner 不能做到这一点让我很困扰:

var updatesReplayed = updates.Replay();
updatesReplayed.Connect();

var results = response.Concat(updatesReplayed);

下面是演示功能的测试:

// time               | 1 2 3 4 5 6 7 8 9
// Response(cold)     |     x y z |
// Updates(hot)       | 1 2   3       4 |
// Result             |     x y z 123 4 |

var scheduler = new TestScheduler();
    var updates = scheduler.CreateHotObservable(
        ReactiveTest.OnNext(100.ToMsTicks(), "1"),
        ReactiveTest.OnNext(200.ToMsTicks(), "2"),
        ReactiveTest.OnNext(400.ToMsTicks(), "3"),
        ReactiveTest.OnNext(800.ToMsTicks(), "4"),
        ReactiveTest.OnCompleted<string>(900.ToMsTicks())
    );

    var response = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(300.ToMsTicks(), "x"),
        ReactiveTest.OnNext(400.ToMsTicks(), "y"),
        ReactiveTest.OnNext(500.ToMsTicks(), "z"),
        ReactiveTest.OnCompleted<string>(600.ToMsTicks())
    );

    var expectedResults = scheduler.CreateHotObservable(
        ReactiveTest.OnNext(300.ToMsTicks(), "x"),
        ReactiveTest.OnNext(400.ToMsTicks(), "y"),
        ReactiveTest.OnNext(500.ToMsTicks(), "z"),
        ReactiveTest.OnNext(600.ToMsTicks(), "1"),
        ReactiveTest.OnNext(600.ToMsTicks(), "2"),
        ReactiveTest.OnNext(600.ToMsTicks(), "3"),
        ReactiveTest.OnNext(800.ToMsTicks(), "4"),
        ReactiveTest.OnCompleted<string>(900.ToMsTicks())
    );

    var replayed = updates.Replay();
    replayed.Connect();

    var results = response.Concat(replayed);
    var observer = scheduler.CreateObserver<string>();
    results.Subscribe(observer);

    scheduler.Start();
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

来自问题的图表:

缓冲所有更新,直到响应完成,然后显示其余更新。 回复立即出现。

updates.TakeUntil(response.LastAsync())
       .ToArray()
       .SelectMany(x => x)
       .Concat(updates)
       .Merge(response);