Observable:ANDing 来源和完成

Observable: ANDing sources and completion

我有一组 IObservable<bool>s - 让我们称它们为 sources - 如果所有输入均为真,我想将它们的值与生成 true

sources.CombineLatest().Select(values => values.All(x => x))

但是,如果其中一个源完成并具有 false 的值(这意味着所有源再也不会是 true),我想发出信号 false并完成。

我该怎么做?

我认为这样做:

public static IObservable<bool> Foo(
    this IEnumerable<IObservable<bool>> sources)
{                       
    var sourceArray = sources.Select(s => s.Publish().RefCount()).ToArray();

    var terminator = sourceArray
        .ToObservable(Scheduler.Default)
        .SelectMany(x => x.StartWith(true).LastAsync().Where(y => y == false));

    var result = sourceArray
        .CombineLatest(values => values.All(x => x))
        .DistinctUntilChanged()
        .TakeUntil(terminator);

    return result; 
}  

正在进行一些发布以防止副作用,但关键是 terminator,它会监视最后一个值为 false 的流。

这是我在没有发布和需要调度程序的情况下的看法:

public static IObservable<bool> Foo(
    this IEnumerable<IObservable<bool>> sources)
{
    var projectedSources = sources.Select(source => source
        .Materialize()
        .Scan(
            new
            {
                Latest = true,
                IsCompleted = false
            },
            (tuple, notification) => new
            {
                Latest = notification.HasValue ? notification.Value : tuple.Latest,
                IsCompleted = tuple.IsCompleted || !notification.HasValue
            }));

    return projectedSources
        .CombineLatest()
        .TakeWhile(list => list.All(x => !x.IsCompleted || x.Latest))
        .Select(list => list.All(x => x.Latest))
        .DistinctUntilChanged();
}

首先,我们具体化每个源并扫描它,记录它的最后一个值以及源是否已经完成。然后,我们合并源,仅在所有源未完成或具有最新值 true 的情况下才从结果中获取。然后,我们将所有源的值投影到单个输出值。

注意:您可能希望以 true 的值开始您的来源(如上述解决方案)。试验一下。