是否有一个 Rx.Net 运算符告诉订阅观察者的数量?

Is there a Rx.Net Operator that tells the number of subscribed observers?

我正在尝试调试我是否正确地将观察者部署到长 运行 服务(暴露 IObservable 的服务)。我想知道是否有一个操作员或我们可以创建的东西来记录活跃观察者的数量,比如说。

public class NewsService
{
    IObservable<Article> GetArticles();
} 


NewsService.Instance
    .GetArticles()
    .DoCount(x=> Trace.Writeline("The current count is {x}"))
    .Subscribe();

提出了一个解决方案 here,它适用于主题。如果我们无法访问 Subject 并且库正在公开 IObservable.

怎么办

一般来说,没有定义任意可观察序列的订阅者计数的概念。

对于诸如 Observable.Interval 之类的冷可观察对象,每次您订阅可观察对象时,都会创建一个新的管道实例,从它的角度来看,它一次只能看到一个观察者。

尽管如此,我们可以预热冷的 Observable,观察订阅的来来去去。

    public static IObservable<T> RefCount<T>(this IObservable<T> source, Action<int> onChange)
    {
        var subscribers = 0;
        var shared = source.Publish().RefCount();
        void callback(int count) => onChange(Interlocked.Add(ref subscribers, count));

        return Observable.Create<T>(observer =>
        {
            callback(+1);
            var subscription = shared.Subscribe(observer);
            var dispose = Disposable.Create(() => callback(-1));

            return new CompositeDisposable(subscription, dispose);
        });
    }

演示

        var values = 
            Observable
            .Interval(TimeSpan.FromSeconds(0.1))
            .RefCount(count => Console.WriteLine($"Subscribers: {count}"));

        values.Take(5).Subscribe();
        values.Take(10).Subscribe();
        values.Take(15).Subscribe();

输出

Subscribers: 1
Subscribers: 2
Subscribers: 3
Subscribers: 2
Subscribers: 1
Subscribers: 0

现在,这行得通了,因为我们有一个父 observable 的共享视图。 所以尽量让所有的订阅都指向同一个实例。

_articles = GetArticles().RefCount(count => Console.WriteLine($"Subscribers: {count}")));
...
_articles.Subscribe();