当任何值由另一个 Observable 产生时,选择 Observable 的最新值
Pick Observable latest value when any value is produced by another Observable
我有一个从常规 .NET 事件生成的 Observable。 Observable 是热的——而不是 warm——从某种意义上说,它甚至在任何订阅之前就开始产生价值,并且每次有人订阅时,它都会收到最新产生的价值。我们将其命名为 eventStream
.
然后我有另一个 Observable,由另一个 class 公开,它代表一些 状态流 ,所以每个新值都给出了由 管理的某些东西的当前状态 class。这个 Observable 也很热门。我们将其命名为 stateStream
.
每次事件序列产生一个新值时,我想选择(我会说 sample,但这可能会导致混淆)状态序列提供的最新值。这应该会产生一个新序列,组合两个值,然后处理它们,等等。
这是我想出来的,但它似乎不起作用:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
其背后的基本原理来自我工作过的其他类似场景,其中某些来源产生的新值导致对 运行 的新订阅,因此 new Observable 被返回,最后 IObservable<IObservable<...>>
再次使用 Switch()
或一些类似的运算符被压缩成 一维 IObservable。
但是在这种情况下,从快速测试来看,似乎没有新的订阅,并且只产生了第一个 stateStream
值。相反,我想在每次 eventStream
触发 .
时选择第一个值 (Take(1)
)
AFAIK,CombineLatest
和 Zip
不符合要求:每当两个序列之一提供新值时,CombineLatest
就会触发; Zip
每当两个序列都有一个可用的新值时就会触发,通常这意味着当两个序列中最慢的一个有值时。出于与 Zip
.
相同的原因,And/Then/When
也不应该是正确的
我也检查了 SO 线程 combining one observable with latest from another observable,但我认为这不适用于此处。仅在我阅读的评论之一中
[...] and then Scan acts like a CombineLatest that filters for notifications from only one side
不知怎的,这听起来很熟悉,但我无法理解。
您需要状态流的默认值,以防它从未发出过。将此默认值作为参数传递给 MostRecent
(我在这里只使用了 null
)并使用 Zip
的重载,它需要一个 IEnumerable
:
eventStream.Zip(
stateStream.MostRecent(null),
(evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);
我想你想要 Observable.Sample()
stateSource.Sample(eventSource)
.Zip(eventSource,...)
在我看来,您当前的解决方案实际上非常接近,但听起来您只需要交换 eventStream
& stateStream
observables,然后删除 .Take(1)
.
试试这个:
stateStream
.Select(stateValue =>
eventStream
.Select(eventValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => { /* do something */ });
根据 stateStream
的配置方式,您可能需要向其添加 .StartWith(...)
以从 eventStream
获取初始值,但我认为这种方法可以满足您的要求。
Rx.Net 2.3.0-beta2 的最新(测试版)版本有 WithLatestFrom
:
//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState),
(evt, state) => new {Event = evt, State = state})
.Subscribe(/*Do something*/);
如果没有,您可以使用(注意未经测试)填充它:
public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
this IObservable<TLeft> source,
IObservable<TRight> other,
Func<TLeft, TRight, TResult> resultSelector)
{
return source.Publish(os =>
other.Select(a => os
.Select(b => resultSelector(b,a)))
.Switch());
}
Source 如果我没记错的话,由@JamesWorld 提供。
我有一个从常规 .NET 事件生成的 Observable。 Observable 是热的——而不是 warm——从某种意义上说,它甚至在任何订阅之前就开始产生价值,并且每次有人订阅时,它都会收到最新产生的价值。我们将其命名为 eventStream
.
然后我有另一个 Observable,由另一个 class 公开,它代表一些 状态流 ,所以每个新值都给出了由 管理的某些东西的当前状态 class。这个 Observable 也很热门。我们将其命名为 stateStream
.
每次事件序列产生一个新值时,我想选择(我会说 sample,但这可能会导致混淆)状态序列提供的最新值。这应该会产生一个新序列,组合两个值,然后处理它们,等等。
这是我想出来的,但它似乎不起作用:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
其背后的基本原理来自我工作过的其他类似场景,其中某些来源产生的新值导致对 运行 的新订阅,因此 new Observable 被返回,最后 IObservable<IObservable<...>>
再次使用 Switch()
或一些类似的运算符被压缩成 一维 IObservable。
但是在这种情况下,从快速测试来看,似乎没有新的订阅,并且只产生了第一个 stateStream
值。相反,我想在每次 eventStream
触发 .
Take(1)
)
AFAIK,CombineLatest
和 Zip
不符合要求:每当两个序列之一提供新值时,CombineLatest
就会触发; Zip
每当两个序列都有一个可用的新值时就会触发,通常这意味着当两个序列中最慢的一个有值时。出于与 Zip
.
And/Then/When
也不应该是正确的
我也检查了 SO 线程 combining one observable with latest from another observable,但我认为这不适用于此处。仅在我阅读的评论之一中
[...] and then Scan acts like a CombineLatest that filters for notifications from only one side
不知怎的,这听起来很熟悉,但我无法理解。
您需要状态流的默认值,以防它从未发出过。将此默认值作为参数传递给 MostRecent
(我在这里只使用了 null
)并使用 Zip
的重载,它需要一个 IEnumerable
:
eventStream.Zip(
stateStream.MostRecent(null),
(evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);
我想你想要 Observable.Sample()
stateSource.Sample(eventSource)
.Zip(eventSource,...)
在我看来,您当前的解决方案实际上非常接近,但听起来您只需要交换 eventStream
& stateStream
observables,然后删除 .Take(1)
.
试试这个:
stateStream
.Select(stateValue =>
eventStream
.Select(eventValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => { /* do something */ });
根据 stateStream
的配置方式,您可能需要向其添加 .StartWith(...)
以从 eventStream
获取初始值,但我认为这种方法可以满足您的要求。
Rx.Net 2.3.0-beta2 的最新(测试版)版本有 WithLatestFrom
:
//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState),
(evt, state) => new {Event = evt, State = state})
.Subscribe(/*Do something*/);
如果没有,您可以使用(注意未经测试)填充它:
public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
this IObservable<TLeft> source,
IObservable<TRight> other,
Func<TLeft, TRight, TResult> resultSelector)
{
return source.Publish(os =>
other.Select(a => os
.Select(b => resultSelector(b,a)))
.Switch());
}
Source 如果我没记错的话,由@JamesWorld 提供。