异步方法中的 Rx StartWith 未应用起始值
Rx StartWith in async method not applying the starting value
我正在尝试使用 StartWith
方法将初始值注入 RX 流:
public async Task<IObservable<Price>) Stream(Instrument instrumentDetails)
{
var initialPrice = await _svc.GetSomeInitialPrice();
var stream = _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails));
stream.StartWith(initialPrice);
return stream;
}
然而,由于调用获取初始值,该方法是异步的。无论如何它都需要在整个调用堆栈中保持异步
我发现该值从未添加到开头。我刚刚得到流的其余部分
如果我 await
StartWith
方法,它永远不会 returns
知道我做错了什么
IObservable
上的方法不会修改基础对象 - 它们 return 是一个新对象。 stream.StartWith(initialPrice)
return 是您忽略的新可观察对象,它对 stream
没有任何作用。
你应该这样写:
stream = stream.StartWith(initialPrice);
或者:
var stream = _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails))
.StartWith(initialPrice);
旁注:如果您等待一个可观察对象,它将等到可观察对象完成,即当它发出所有值并调用其 OnComplete
方法时。您通常应该等待一个可观察对象,您知道它只会发出 1 个值然后完成(例如,对远程服务器的请求),因为等待它只会 return 最后发出的值。因此,如果您的 stream
预计会持续发出值,则等待它是没有意义的。
您最好避免像现在这样混用 Task<>
和 Observable<>
。如果可以,请坚持使用 IObservable<>
.
你的情况很简单。
试试这个:
public IObservable<Price> Stream(Instrument instrumentDetails)
=>
Observable
.FromAsync(() => _svc.GetSomeInitialPrice())
.SelectMany(x =>
_priceObserver
.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails))
.StartWith(x));
我已经用一些基本的代码对此进行了测试,它工作得很好。它还确保新订阅者将获得 _svc.GetSomeInitialPrice()
随着时间推移可能产生的任何新值。
我正在尝试使用 StartWith
方法将初始值注入 RX 流:
public async Task<IObservable<Price>) Stream(Instrument instrumentDetails)
{
var initialPrice = await _svc.GetSomeInitialPrice();
var stream = _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails));
stream.StartWith(initialPrice);
return stream;
}
然而,由于调用获取初始值,该方法是异步的。无论如何它都需要在整个调用堆栈中保持异步
我发现该值从未添加到开头。我刚刚得到流的其余部分
如果我 await
StartWith
方法,它永远不会 returns
知道我做错了什么
IObservable
上的方法不会修改基础对象 - 它们 return 是一个新对象。 stream.StartWith(initialPrice)
return 是您忽略的新可观察对象,它对 stream
没有任何作用。
你应该这样写:
stream = stream.StartWith(initialPrice);
或者:
var stream = _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails))
.StartWith(initialPrice);
旁注:如果您等待一个可观察对象,它将等到可观察对象完成,即当它发出所有值并调用其 OnComplete
方法时。您通常应该等待一个可观察对象,您知道它只会发出 1 个值然后完成(例如,对远程服务器的请求),因为等待它只会 return 最后发出的值。因此,如果您的 stream
预计会持续发出值,则等待它是没有意义的。
您最好避免像现在这样混用 Task<>
和 Observable<>
。如果可以,请坚持使用 IObservable<>
.
你的情况很简单。
试试这个:
public IObservable<Price> Stream(Instrument instrumentDetails)
=>
Observable
.FromAsync(() => _svc.GetSomeInitialPrice())
.SelectMany(x =>
_priceObserver
.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol)
.Select(o => GetPrice(o, instrumentDetails))
.StartWith(x));
我已经用一些基本的代码对此进行了测试,它工作得很好。它还确保新订阅者将获得 _svc.GetSomeInitialPrice()
随着时间推移可能产生的任何新值。