当任何值由另一个 Observable 产生时,选择 Observable 的最新值

Pick Observable latest value when any value is produced by another Observable

我有一个从常规 .NET 事件生成的 Observable。 Observable 是热的——而不是 warm——从某种意义上说,它甚至在任何订阅之前就开始产生价值,并且每次有人订阅时,它都会收到最新产生的价值。我们将其命名为 eventStream.

然后我有另一个 Observable,由另一个 class 公开,它代表一些 状态流 ,所以每个新值都给出了由 管理的某些东西的当前状态 class。这个 Observable 也很热门。我们将其命名为 stateStream.

每次事件序列产生一个新值时,我想选择(我会说 sample,但这可能会导致混淆)状态序列提供的最新值。这应该会产生一个新序列,组合两个值,然后处理它们,等等。

这是我想出来的,但它似乎不起作用:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;

eventStream.Select(eventValue => 
  stateStream
    .Take(1)
    .Select(stateValue => new { Event = eventValue, State = stateValue }))
  .Switch()
  .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
  .Subscribe(value => /* do something */);

其背后的基本原理来自我工作过的其他类似场景,其中某些来源产生的新值导致对 运行 的新订阅,因此 new Observable 被返回,最后 IObservable<IObservable<...>> 再次使用 Switch() 或一些类似的运算符被压缩成 一维 IObservable。
但是在这种情况下,从快速测试来看,似乎没有新的订阅,并且只产生了第一个 stateStream 值。相反,我想在每次 eventStream 触发 .

时选择第一个值 (Take(1))

AFAIK,CombineLatestZip 不符合要求:每当两个序列之一提供新值时,CombineLatest 就会触发; Zip 每当两个序列都有一个可用的新值时就会触发,通常这意味着当两个序列中最慢的一个有值时。出于与 Zip.

相同的原因,And/Then/When 也不应该是正确的

我也检查了 SO 线程 combining one observable with latest from another observable,但我认为这不适用于此处。仅在我阅读的评论之一中

[...] and then Scan acts like a CombineLatest that filters for notifications from only one side

不知怎的,这听起来很熟悉,但我无法理解。

您需要状态流的默认值,以防它从未发出过。将此默认值作为参数传递给 MostRecent(我在这里只使用了 null)并使用 Zip 的重载,它需要一个 IEnumerable:

eventStream.Zip(
    stateStream.MostRecent(null),
    (evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);

我想你想要 Observable.Sample()

stateSource.Sample(eventSource)
     .Zip(eventSource,...)

在我看来,您当前的解决方案实际上非常接近,但听起来您只需要交换 eventStream & stateStream observables,然后删除 .Take(1) .

试试这个:

stateStream
    .Select(stateValue => 
        eventStream
            .Select(eventValue => new { Event = eventValue, State = stateValue }))
    .Switch()
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
    .Subscribe(value => { /* do something */ });

根据 stateStream 的配置方式,您可能需要向其添加 .StartWith(...) 以从 eventStream 获取初始值,但我认为这种方法可以满足您的要求。

Rx.Net 2.3.0-beta2 的最新(测试版)版本有 WithLatestFrom

//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState), 
                           (evt, state) => new {Event = evt, State = state})
           .Subscribe(/*Do something*/);

如果没有,您可以使用(注意未经测试)填充它:

public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
    this IObservable<TLeft> source,
    IObservable<TRight> other,
    Func<TLeft, TRight, TResult> resultSelector)
{
    return source.Publish(os =>
        other.Select(a => os
            .Select(b => resultSelector(b,a)))
            .Switch());
}

Source 如果我没记错的话,由@JamesWorld 提供。