如何清理使用 .create(OnSubscribe) 方法创建的 Observable
How to cleanup on Observable created with the .create(OnSubscribe) method
我有以下代码,它使用 Observable.create(OnSubscribe)
方法创建自定义 Observable
:
public class Main {
public static void main(String[] args) {
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
new Scanner(System.in).nextLine();
System.err.println("finishing");
subscription.unsubscribe();
}
}
Observable 使用计时器每秒发出一个字符串。当用户按下回车键时,订阅将被取消。
但是,定时器还是会执行。我怎样才能取消定时器?我想一定有什么地方有钩子,但我找不到。
在 .NET 上,create
方法会 return 一个 IDisposable
,我可以用它来停止计时器。我不确定如何将它映射到 RxJava,因为它的 subscribe
方法是 void
.
您可以添加在您取消订阅流时调用的代码。为此,您必须在 create
方法中向 subscriber
添加一个新的 Subscription
:
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// stop the timer here
}
@Override
public boolean isUnsubscribed() {
// is the stream unsubscribed ?
return false;
}
});
您可以在向订阅者发送新值之前检查订阅者是否已取消订阅。如果已经退订,则停止计时:
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
if (subscriber.isUnsubscribed()) {
// stop timer
} else {
subscriber.onNext("tick! tack!");
}
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
以上组合即可完成工作:
public class Main {
public static void main(String[] args) throws InterruptedException {
Subscription subscription = Observable.create(subscriber -> {
Timer timer = new Timer();
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
TimerTask task = new TimerTask() {
@Override
public void run() {
if (!subscriber.isUnsubscribed())
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}).subscribe(System.out::println);
System.err.println("finishing");
subscription.unsubscribe();
Thread.sleep(10000);
}
这是一个更惯用的 RxJava 示例,它用更少的代码完成同样的事情:
Subscription subscription = Observable
.interval(1000, TimeUnit.MILLISECONDS)
.map(n -> "tick! tack!")
.subscribe(System.out::println);
Thread.sleep(3000);
System.err.println("finishing");
subscription.unsubscribe();
一个更具声明性(恕我直言,更易于阅读)的解决方案是使用 Observable.using
方法:
Observable<String> obs = Observable.using(
// resource factory:
() -> new Timer(),
// observable factory:
timer -> Observable.create(subscriber -> {
TimerTask task = new TimerTask() {
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}),
// dispose action:
timer -> timer.cancel()
);
您声明如何创建依赖资源(Timer
),如何使用它创建 Observable,以及如何销毁它,RxJava 将负责在订阅时创建计时器并销毁取消订阅。
我正在寻找类似的解决方案,我在 RxAndroid 中找到了这段代码:
您似乎不需要使用 Observable.using
来完成清理步骤。相反,您似乎可以添加另一个订阅者来进行清理。
Observable .create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
我有以下代码,它使用 Observable.create(OnSubscribe)
方法创建自定义 Observable
:
public class Main {
public static void main(String[] args) {
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
new Scanner(System.in).nextLine();
System.err.println("finishing");
subscription.unsubscribe();
}
}
Observable 使用计时器每秒发出一个字符串。当用户按下回车键时,订阅将被取消。
但是,定时器还是会执行。我怎样才能取消定时器?我想一定有什么地方有钩子,但我找不到。
在 .NET 上,create
方法会 return 一个 IDisposable
,我可以用它来停止计时器。我不确定如何将它映射到 RxJava,因为它的 subscribe
方法是 void
.
您可以添加在您取消订阅流时调用的代码。为此,您必须在 create
方法中向 subscriber
添加一个新的 Subscription
:
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// stop the timer here
}
@Override
public boolean isUnsubscribed() {
// is the stream unsubscribed ?
return false;
}
});
您可以在向订阅者发送新值之前检查订阅者是否已取消订阅。如果已经退订,则停止计时:
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
if (subscriber.isUnsubscribed()) {
// stop timer
} else {
subscriber.onNext("tick! tack!");
}
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
以上组合即可完成工作:
public class Main {
public static void main(String[] args) throws InterruptedException {
Subscription subscription = Observable.create(subscriber -> {
Timer timer = new Timer();
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
TimerTask task = new TimerTask() {
@Override
public void run() {
if (!subscriber.isUnsubscribed())
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}).subscribe(System.out::println);
System.err.println("finishing");
subscription.unsubscribe();
Thread.sleep(10000);
}
这是一个更惯用的 RxJava 示例,它用更少的代码完成同样的事情:
Subscription subscription = Observable
.interval(1000, TimeUnit.MILLISECONDS)
.map(n -> "tick! tack!")
.subscribe(System.out::println);
Thread.sleep(3000);
System.err.println("finishing");
subscription.unsubscribe();
一个更具声明性(恕我直言,更易于阅读)的解决方案是使用 Observable.using
方法:
Observable<String> obs = Observable.using(
// resource factory:
() -> new Timer(),
// observable factory:
timer -> Observable.create(subscriber -> {
TimerTask task = new TimerTask() {
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}),
// dispose action:
timer -> timer.cancel()
);
您声明如何创建依赖资源(Timer
),如何使用它创建 Observable,以及如何销毁它,RxJava 将负责在订阅时创建计时器并销毁取消订阅。
我正在寻找类似的解决方案,我在 RxAndroid 中找到了这段代码:
您似乎不需要使用 Observable.using
来完成清理步骤。相反,您似乎可以添加另一个订阅者来进行清理。
Observable .create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
timer.scheduleAtFixedRate(task, 0L, 1000L);
})