Observable:ANDing 来源和完成
Observable: ANDing sources and completion
我有一组 IObservable<bool>
s - 让我们称它们为 sources
- 如果所有输入均为真,我想将它们的值与生成 true
:
sources.CombineLatest().Select(values => values.All(x => x))
但是,如果其中一个源完成并具有 false
的值(这意味着所有源再也不会是 true
),我想发出信号 false
并完成。
我该怎么做?
我认为这样做:
public static IObservable<bool> Foo(
this IEnumerable<IObservable<bool>> sources)
{
var sourceArray = sources.Select(s => s.Publish().RefCount()).ToArray();
var terminator = sourceArray
.ToObservable(Scheduler.Default)
.SelectMany(x => x.StartWith(true).LastAsync().Where(y => y == false));
var result = sourceArray
.CombineLatest(values => values.All(x => x))
.DistinctUntilChanged()
.TakeUntil(terminator);
return result;
}
正在进行一些发布以防止副作用,但关键是 terminator
,它会监视最后一个值为 false 的流。
这是我在没有发布和需要调度程序的情况下的看法:
public static IObservable<bool> Foo(
this IEnumerable<IObservable<bool>> sources)
{
var projectedSources = sources.Select(source => source
.Materialize()
.Scan(
new
{
Latest = true,
IsCompleted = false
},
(tuple, notification) => new
{
Latest = notification.HasValue ? notification.Value : tuple.Latest,
IsCompleted = tuple.IsCompleted || !notification.HasValue
}));
return projectedSources
.CombineLatest()
.TakeWhile(list => list.All(x => !x.IsCompleted || x.Latest))
.Select(list => list.All(x => x.Latest))
.DistinctUntilChanged();
}
首先,我们具体化每个源并扫描它,记录它的最后一个值以及源是否已经完成。然后,我们合并源,仅在所有源未完成或具有最新值 true
的情况下才从结果中获取。然后,我们将所有源的值投影到单个输出值。
注意:您可能希望以 true
的值开始您的来源(如上述解决方案)。试验一下。
我有一组 IObservable<bool>
s - 让我们称它们为 sources
- 如果所有输入均为真,我想将它们的值与生成 true
:
sources.CombineLatest().Select(values => values.All(x => x))
但是,如果其中一个源完成并具有 false
的值(这意味着所有源再也不会是 true
),我想发出信号 false
并完成。
我该怎么做?
我认为这样做:
public static IObservable<bool> Foo(
this IEnumerable<IObservable<bool>> sources)
{
var sourceArray = sources.Select(s => s.Publish().RefCount()).ToArray();
var terminator = sourceArray
.ToObservable(Scheduler.Default)
.SelectMany(x => x.StartWith(true).LastAsync().Where(y => y == false));
var result = sourceArray
.CombineLatest(values => values.All(x => x))
.DistinctUntilChanged()
.TakeUntil(terminator);
return result;
}
正在进行一些发布以防止副作用,但关键是 terminator
,它会监视最后一个值为 false 的流。
这是我在没有发布和需要调度程序的情况下的看法:
public static IObservable<bool> Foo(
this IEnumerable<IObservable<bool>> sources)
{
var projectedSources = sources.Select(source => source
.Materialize()
.Scan(
new
{
Latest = true,
IsCompleted = false
},
(tuple, notification) => new
{
Latest = notification.HasValue ? notification.Value : tuple.Latest,
IsCompleted = tuple.IsCompleted || !notification.HasValue
}));
return projectedSources
.CombineLatest()
.TakeWhile(list => list.All(x => !x.IsCompleted || x.Latest))
.Select(list => list.All(x => x.Latest))
.DistinctUntilChanged();
}
首先,我们具体化每个源并扫描它,记录它的最后一个值以及源是否已经完成。然后,我们合并源,仅在所有源未完成或具有最新值 true
的情况下才从结果中获取。然后,我们将所有源的值投影到单个输出值。
注意:您可能希望以 true
的值开始您的来源(如上述解决方案)。试验一下。