向 RX Stream / IObservable 添加初始值
Add an initial value to RX Stream / IObservable
我有一个方法可以 returns 过滤后的 RX 流作为 Iobservable :
public IObservable<Price> LivePrices(Instrument instrumentDetails)
{
return _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol )
.Select(o => GetPrice(o, instrumentDetails));
}
问题是流不会经常更改某些值,所以我需要用第一个值初始化它
我该怎么做?我读到 Subject
既可以是观察者也可以是可观察者。所以我想我需要以某种方式订阅它作为 Subject
,在流中添加第一条消息,然后将其设置为现在的状态。但不知道该怎么做
有什么想法吗?
你试过了吗StartWith
?
public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{
var obs = _ratesObserver.Stream
.Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
.Select(o => GetFxDeal(o, negotiation));
return condition ? obs.StartWith(new FxDeal()) : obs;
}
我有一个方法可以 returns 过滤后的 RX 流作为 Iobservable :
public IObservable<Price> LivePrices(Instrument instrumentDetails)
{
return _priceObserver.Stream
.Where(o => o.Symbol == instrumentDetails.Symbol )
.Select(o => GetPrice(o, instrumentDetails));
}
问题是流不会经常更改某些值,所以我需要用第一个值初始化它
我该怎么做?我读到 Subject
既可以是观察者也可以是可观察者。所以我想我需要以某种方式订阅它作为 Subject
,在流中添加第一条消息,然后将其设置为现在的状态。但不知道该怎么做
有什么想法吗?
你试过了吗StartWith
?
public IObservable<FxDeal> LiveRates(NegotiationDetails negotiation)
{
var obs = _ratesObserver.Stream
.Where(o => o.BaseCurrency == negotiation.Base && o.TermsCurrency == negotiation.Terms)
.Select(o => GetFxDeal(o, negotiation));
return condition ? obs.StartWith(new FxDeal()) : obs;
}