异步方法中的 Rx StartWith 未应用起始值

Rx StartWith in async method not applying the starting value

我正在尝试使用 StartWith 方法将初始值注入 RX 流:

public async Task<IObservable<Price>) Stream(Instrument instrumentDetails)
{
    var initialPrice = await _svc.GetSomeInitialPrice();

    var stream = _priceObserver.Stream
        .Where(o => o.Symbol == instrumentDetails.Symbol)
        .Select(o => GetPrice(o, instrumentDetails));

    stream.StartWith(initialPrice);

    return stream;
}

然而,由于调用获取初始值,该方法是异步的。无论如何它都需要在整个调用堆栈中保持异步

我发现该值从未添加到开头。我刚刚得到流的其余部分

如果我 await StartWith 方法,它永远不会 returns

知道我做错了什么

IObservable 上的方法不会修改基础对象 - 它们 return 是一个新对象。 stream.StartWith(initialPrice) return 是您忽略的新可观察对象,它对 stream 没有任何作用。

你应该这样写:

stream = stream.StartWith(initialPrice);

或者:

var stream = _priceObserver.Stream
    .Where(o => o.Symbol == instrumentDetails.Symbol)
    .Select(o => GetPrice(o, instrumentDetails))
    .StartWith(initialPrice);

旁注:如果您等待一个可观察对象,它将等到可观察对象完成,即当它发出所有值并调用其 OnComplete 方法时。您通常应该等待一个可观察对象,您知道它只会发出 1 个值然后完成(例如,对远程服务器的请求),因为等待它只会 return 最后发出的值。因此,如果您的 stream 预计会持续发出值,则等待它是没有意义的。

您最好避免像现在这样混用 Task<>Observable<>。如果可以,请坚持使用 IObservable<>.

你的情况很简单。

试试这个:

public IObservable<Price> Stream(Instrument instrumentDetails)
    =>
        Observable
            .FromAsync(() => _svc.GetSomeInitialPrice())
            .SelectMany(x =>
                _priceObserver
                    .Stream
                    .Where(o => o.Symbol == instrumentDetails.Symbol)
                    .Select(o => GetPrice(o, instrumentDetails))
                    .StartWith(x));

我已经用一些基本的代码对此进行了测试,它工作得很好。它还确保新订阅者将获得 _svc.GetSomeInitialPrice() 随着时间推移可能产生的任何新值。