如何对第一个订阅创建和最后一个订阅处理引入副作用?

How do I introduce a side effect on the first subscription creation and last subscription disposal?

我有一个 class 必须执行 setup/teardown 才能生成包装为可观察对象的事件。为简洁起见,我在下面的代码中使用 Observable.Interval 生成一个可观察流。我需要在此代码中进行哪些更改才能使 Start 和 Stop 被调用两次?实际上,每次引用计数递增到 1 时都需要调用 Start,而每次引用计数递减到 0 时都需要调用 Stop。

    static async Task Main(string[] args)
    {
        var provider = new SomeProvider();
        var s1       = provider.Subscribe(on1);
        var s2       = provider.Subscribe(on2);

        await Task.Delay(5000);

        s1.Dispose();
        s2.Dispose();

        var s3 = provider.Subscribe(on1);
        var s4 = provider.Subscribe(on2);

        await Task.Delay(5000);

        s3.Dispose();
        s4.Dispose();
    }

    private static void on1(long obj) => Console.WriteLine("on1");
    private static void on2(long obj) => Console.WriteLine("on2");

    public class SomeProvider : IObservable<long>
    {
        //Example observable stream
        readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).Do(Start).Publish().RefCount();

        public IDisposable Subscribe(IObserver<long> observer)
        {
            var subscription = timer.Subscribe(observer);

            return new CompositeDisposable(subscription, Disposable.Create(() => { Stop(); }));
        }

        private static void Start(long obj) => Console.WriteLine("STARTED");
        private void Stop() => Console.WriteLine("STOPPED");
    }

任何时候你做source.Publish().RefCount(),当引用计数变为1时将调用源的Subscribe方法,当引用计数变为0时将处理订阅。你可以使用这个事实来实现你想要的行为。像这样的东西应该可以工作(下面未经测试的代码):

IObservable<T> MonitorSubscription<T>(IObservable<T> source, Action onSubscribe, Action onDispose)
{
    //TODO: argument error checking/handling
    return Observable.Create<T>(obs =>
        {
            onSubscribe();
            var subscription = source.Subscribe(obs);
            return Disposable.Create(() =>
                                     {
                                         subscription.Dispose();
                                         onDispose();
                                     });
        });
}

您可以在具有“开始”和“停止”功能的地方调用此功能。

readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).MonitorSubscription(Start, Stop).Publish().RefCount();

现在您将在计时器的引用计数变为 1 时调用 Start,在引用计数变为 0 时调用 Stop。

您可以像这样使用 EmptyFinallyConcat 运算符:

readonly IObservable<long> timer = Observable
    .Empty<long>()
    .Finally(() => Console.WriteLine($"Started"))
    .Concat(Observable.Interval(TimeSpan.FromSeconds(1)))
    .Finally(() => Console.WriteLine($"Stopped"))
    .Publish()
    .RefCount();

首次订阅时打印"Started",最后退订时打印"Stopped"

或者您可以使用下面的扩展方法 OnSubscription。它应该稍微更有效率,因为它避免了链接。

/// <summary>Invokes a specific action before every subscription.</summary>
public static IObservable<T> OnSubscription<T>(this IObservable<T> source, Action action)
    => Observable.Create<T>(o => { action(); return source.Subscribe(o); });

用法示例:

readonly IObservable<long> timer = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .OnSubscription(() => Console.WriteLine($"Started"))
    .Finally(() => Console.WriteLine($"Stopped"))
    .Publish()
    .RefCount();

借助前面的答案,我创建了这个扩展方法:

public static IObservable<T> OnSubscriptionDo<T>(this IObservable<T> source, Action subscribeAction, Action unsubscribeAction = null) =>
    (null == unsubscribeAction) ?
        source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Publish().RefCount() :
        source.CombineLatest(Observable.Return(Unit.Default).Finally(subscribeAction), (evt, _) => evt).Finally(unsubscribeAction).Publish().RefCount();

“执行”是为了提供一些指示,表明此方法会引入副作用。然后可观察流定义如下:

readonly IObservable<long> timer = Observable.Interval(TimeSpan.FromSeconds(1)).OnSubscriptionDo(Start, Stop);