统计实时并发流数
Count number of live concurrent streams
我有一个流。我想知道如何将其转换为当前正在运行的许多内部流的实时计数,即未完成或出错。
我将如何实施 CountLiveStreams?
var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
谢谢!
下面是 CountLiveStreams
运算符的实现:
public static IObservable<int> CountLiveStreams<T>(
this IObservable<IObservable<T>> streamOfStreams)
{
return streamOfStreams
.Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
.Prepend(1).Append(-1))
.Merge()
.Scan(0, (accumulator, delta) => accumulator + delta)
.Prepend(0);
}
每个发出的流都转换为发出 2 个值的 IObservable<int>
,值 1 在开始,-1 在结束。然后将所有流生成的所有值对合并为一个 IObservable<int>
,最后使用 Scan
运算符累加所有这些数字。
这对我有用:
public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
source
.SelectMany(xs =>
xs
.Materialize()
.Where(x => x.Kind != NotificationKind.OnNext)
.Select(x => -1)
.StartWith(1))
.Scan((x, y) => x + y);
它使用与 Theodor 生成 1
和 -1
然后使用 .Scan
创建 运行 理货的答案相同的策略,但我认为它更清楚调用 .Materialize()
.
我有一个流。我想知道如何将其转换为当前正在运行的许多内部流的实时计数,即未完成或出错。
我将如何实施 CountLiveStreams?
var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
谢谢!
下面是 CountLiveStreams
运算符的实现:
public static IObservable<int> CountLiveStreams<T>(
this IObservable<IObservable<T>> streamOfStreams)
{
return streamOfStreams
.Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
.Prepend(1).Append(-1))
.Merge()
.Scan(0, (accumulator, delta) => accumulator + delta)
.Prepend(0);
}
每个发出的流都转换为发出 2 个值的 IObservable<int>
,值 1 在开始,-1 在结束。然后将所有流生成的所有值对合并为一个 IObservable<int>
,最后使用 Scan
运算符累加所有这些数字。
这对我有用:
public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
source
.SelectMany(xs =>
xs
.Materialize()
.Where(x => x.Kind != NotificationKind.OnNext)
.Select(x => -1)
.StartWith(1))
.Scan((x, y) => x + y);
它使用与 Theodor 生成 1
和 -1
然后使用 .Scan
创建 运行 理货的答案相同的策略,但我认为它更清楚调用 .Materialize()
.