如何停止 rx-java runnable 发射?
How can I stop rx-java runnable from emitting?
我有以下观察结果:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool( 1 );
Observable<List<Widget>> findWidgetsObservable = Observable.create( emitter -> {
executorService.scheduleWithFixedDelay( emitFindWidgets( emitter, 0, 30, TimeUnit.SECONDS );
} );
private Runnable emitFindWidgets( ObservableEmitter<List<Widgets>> emitter ) {
return () -> {
emitter.onNext( Collections.emptyList() ); // dummy empty array
};
}
我在 graphql-java 订阅解析器中返回它,如下所示:
ConnectableObservable<List<Widget>> connectableObservable = findWidgetsObservable.share().publish();
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST )
graphql 订阅按预期工作并向 JavaScript graphql 客户端发送数据,但是当客户端取消订阅时,我的 Runnable 似乎无限继续。也就是说,flowable 的 doOnCancel() 事件处理程序是 运行.
为了解决这个问题,我尝试在 flowable 的 doOnCancel() 中执行以下操作:
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST ).doOnCancel( () -> {
findWidgetsObservable.toFuture().cancel( true );
connectionDisposable.dispose();
})
但是,Runnable 继续无限期地省略。有什么办法可以解决这个问题并完全停止发射?
我确实有一个想法:scheduleWithFixedDelay returns 一个 ScheduledFuture,它有一个 cancel() 方法,但我不确定当调度本身在一个可观察范围内时我是否可以这样做!感谢任何帮助。
运行nable 继续发射,因为您正在调度不是 known/bound 可观察流的调度程序上的发射。
处理连接时,您将停止从上游接收项目,因为与上游 observable 的连接被切断。但是由于您在单独的调度程序上重复将发射器调度到 运行,因此 运行nable 保持 运行ning.
您可以使用自定义调度程序描述自定义调度行为并将其传递给 subscribeOn(Your-Custom-Scheduler)
此外,您提到您可以在 ScheduledFuture
上调用 cancel()
doOnDispose()
.
但是您应该在可观察链中显式切换调度程序。否则,将变得更难调试。
我有以下观察结果:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool( 1 );
Observable<List<Widget>> findWidgetsObservable = Observable.create( emitter -> {
executorService.scheduleWithFixedDelay( emitFindWidgets( emitter, 0, 30, TimeUnit.SECONDS );
} );
private Runnable emitFindWidgets( ObservableEmitter<List<Widgets>> emitter ) {
return () -> {
emitter.onNext( Collections.emptyList() ); // dummy empty array
};
}
我在 graphql-java 订阅解析器中返回它,如下所示:
ConnectableObservable<List<Widget>> connectableObservable = findWidgetsObservable.share().publish();
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST )
graphql 订阅按预期工作并向 JavaScript graphql 客户端发送数据,但是当客户端取消订阅时,我的 Runnable 似乎无限继续。也就是说,flowable 的 doOnCancel() 事件处理程序是 运行.
为了解决这个问题,我尝试在 flowable 的 doOnCancel() 中执行以下操作:
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST ).doOnCancel( () -> {
findWidgetsObservable.toFuture().cancel( true );
connectionDisposable.dispose();
})
但是,Runnable 继续无限期地省略。有什么办法可以解决这个问题并完全停止发射?
我确实有一个想法:scheduleWithFixedDelay returns 一个 ScheduledFuture,它有一个 cancel() 方法,但我不确定当调度本身在一个可观察范围内时我是否可以这样做!感谢任何帮助。
运行nable 继续发射,因为您正在调度不是 known/bound 可观察流的调度程序上的发射。
处理连接时,您将停止从上游接收项目,因为与上游 observable 的连接被切断。但是由于您在单独的调度程序上重复将发射器调度到 运行,因此 运行nable 保持 运行ning.
您可以使用自定义调度程序描述自定义调度行为并将其传递给 subscribeOn(Your-Custom-Scheduler)
此外,您提到您可以在 ScheduledFuture
上调用 cancel()
doOnDispose()
.
但是您应该在可观察链中显式切换调度程序。否则,将变得更难调试。