如何清理使用 .create(OnSubscribe) 方法创建的 Observable

How to cleanup on Observable created with the .create(OnSubscribe) method

我有以下代码,它使用 Observable.create(OnSubscribe) 方法创建自定义 Observable

public class Main {

    public static void main(String[] args) {
        Subscription subscription = Observable
                .create(subscriber -> {
                    Timer timer = new Timer();
                    TimerTask task = new TimerTask() {

                        @Override
                        public void run() {
                            subscriber.onNext("tick! tack!");
                        }
                    };
                    timer.scheduleAtFixedRate(task, 0L, 1000L);
                })
                .subscribe(System.out::println);

        new Scanner(System.in).nextLine();
        System.err.println("finishing");

        subscription.unsubscribe();
    }
}

Observable 使用计时器每秒发出一个字符串。当用户按下回车键时,订阅将被取消。

但是,定时器还是会执行。我怎样才能取消定时器?我想一定有什么地方有钩子,但我找不到。

在 .NET 上,create 方法会 return 一个 IDisposable,我可以用它来停止计时器。我不确定如何将它映射到 RxJava,因为它的 subscribe 方法是 void.

您可以添加在您取消订阅流时调用的代码。为此,您必须在 create 方法中向 subscriber 添加一个新的 Subscription

 subscriber.add(new Subscription() {
                @Override
                public void unsubscribe() {
                     // stop the timer here
                }

                @Override
                public boolean isUnsubscribed() {
                     // is the stream unsubscribed ?
                    return false; 
                }
            });

您可以在向订阅者发送新值之前检查订阅者是否已取消订阅。如果已经退订,则停止计时:

Subscription subscription = Observable
    .create(subscriber -> {
        Timer timer = new Timer();
        TimerTask task = new TimerTask() {

            @Override
            public void run() {
                if (subscriber.isUnsubscribed()) {
                    // stop timer
                } else {
                    subscriber.onNext("tick! tack!");
                }
            }
        };
        timer.scheduleAtFixedRate(task, 0L, 1000L);
    })
    .subscribe(System.out::println);

以上组合即可完成工作:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Subscription subscription = Observable.create(subscriber -> {
            Timer timer = new Timer();
            subscriber.add(Subscriptions.create(() -> {
                timer.cancel();
            }));
            TimerTask task = new TimerTask() {

                @Override
                public void run() {
                    if (!subscriber.isUnsubscribed())
                        subscriber.onNext("tick! tack!");
                }
            };
            timer.scheduleAtFixedRate(task, 0L, 1000L);
        }).subscribe(System.out::println);

        System.err.println("finishing");

        subscription.unsubscribe();

        Thread.sleep(10000);
    }

这是一个更惯用的 RxJava 示例,它用更少的代码完成同样的事情:

Subscription subscription = Observable
    .interval(1000, TimeUnit.MILLISECONDS)
    .map(n -> "tick! tack!")
    .subscribe(System.out::println);
Thread.sleep(3000);
System.err.println("finishing");
subscription.unsubscribe();

一个更具声明性(恕我直言,更易于阅读)的解决方案是使用 Observable.using 方法:

Observable<String> obs = Observable.using(
    // resource factory:
    () -> new Timer(),
    // observable factory:
    timer -> Observable.create(subscriber -> {
        TimerTask task = new TimerTask() {
            public void run() {
                subscriber.onNext("tick! tack!");
            }
        };
        timer.scheduleAtFixedRate(task, 0L, 1000L);
    }),
    // dispose action:
    timer -> timer.cancel()
);

您声明如何创建依赖资源(Timer),如何使用它创建 Observable,以及如何销毁它,RxJava 将负责在订阅时创建计时器并销毁取消订阅。

我正在寻找类似的解决方案,我在 RxAndroid 中找到了这段代码:

https://github.com/ReactiveX/RxAndroid/blob/v0.24.0/rxandroid/src/main/java/rx/android/content/OnSubscribeBroadcastRegister.java

您似乎不需要使用 Observable.using 来完成清理步骤。相反,您似乎可以添加另一个订阅者来进行清理。

Observable .create(subscriber -> {
  Timer timer = new Timer();
  TimerTask task = new TimerTask() {
      @Override
      public void run() {
          subscriber.onNext("tick! tack!");
      }
  };

  subscriber.add(Subscriptions.create(() -> {
    timer.cancel();
  }));

  timer.scheduleAtFixedRate(task, 0L, 1000L);
})