统计实时并发流数

Count number of live concurrent streams

我有一个流。我想知道如何将其转换为当前正在运行的许多内部流的实时计数,即未完成或出错。

我将如何实施 CountLiveStreams?

var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();

谢谢!

下面是 CountLiveStreams 运算符的实现:

public static IObservable<int> CountLiveStreams<T>(
    this IObservable<IObservable<T>> streamOfStreams)
{
    return streamOfStreams
        .Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
            .Prepend(1).Append(-1))
        .Merge()
        .Scan(0, (accumulator, delta) => accumulator + delta)
        .Prepend(0);
}

每个发出的流都转换为发出 2 个值的 IObservable<int>,值 1 在开始,-1 在结束。然后将所有流生成的所有值对合并为一个 IObservable<int>,最后使用 Scan 运算符累加所有这些数字。

这对我有用:

public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
    source
        .SelectMany(xs =>
            xs
                .Materialize()
                .Where(x => x.Kind != NotificationKind.OnNext)
                .Select(x => -1)
                .StartWith(1))
        .Scan((x, y) => x + y);

它使用与 Theodor 生成 1-1 然后使用 .Scan 创建 运行 理货的答案相同的策略,但我认为它更清楚调用 .Materialize().