Flowable.map() 中的长 运行 操作会阻止所有订阅者
Long running operation in Flowable.map() blocks all subscribers
在我的 android 代码中,两个视图需要来自数据库的一些数据同时作为 io.reactivex.Flowable
提供。一个视图需要纯数据,但另一个视图应用涉及一些 IO 并需要一些时间的映射函数。
问题是两个订阅者只有在映射函数终止后才能获取数据。这会导致其中一个视图在加载另一个视图时为空白。
Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();
rxRoomSourceFlowable
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribeOn(Schedulers.io())
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribeOn(Schedulers.io())
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
输出:
Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3
如您所见,只有在长 运行 映射函数终止后,普通订阅者才会收到通知,即使结果完全不相关。调用 subscribe()
的顺序无关紧要,因为无论如何订阅都是异步发生的。
如何确保普通订阅者在数据可用时立即得到通知,而无需等待映射功能?与使用 map()
相比,更改 Flowable
中的数据是否有不同的选项?
share()
以锁步方式调度事件,因此,如果一个订阅者延迟,另一个订阅者将无法及时获得项目。您必须将有问题的订阅者移到另一个线程并让 share
progress:
Flowable<Integer> rxRoomSourceFlowable =
Flowable.fromArray(1, 2, 3)
.subscribeOn(Schedulers.io())
.share();
rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
在我的 android 代码中,两个视图需要来自数据库的一些数据同时作为 io.reactivex.Flowable
提供。一个视图需要纯数据,但另一个视图应用涉及一些 IO 并需要一些时间的映射函数。
问题是两个订阅者只有在映射函数终止后才能获取数据。这会导致其中一个视图在加载另一个视图时为空白。
Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();
rxRoomSourceFlowable
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribeOn(Schedulers.io())
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribeOn(Schedulers.io())
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
输出:
Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3
如您所见,只有在长 运行 映射函数终止后,普通订阅者才会收到通知,即使结果完全不相关。调用 subscribe()
的顺序无关紧要,因为无论如何订阅都是异步发生的。
如何确保普通订阅者在数据可用时立即得到通知,而无需等待映射功能?与使用 map()
相比,更改 Flowable
中的数据是否有不同的选项?
share()
以锁步方式调度事件,因此,如果一个订阅者延迟,另一个订阅者将无法及时获得项目。您必须将有问题的订阅者移到另一个线程并让 share
progress:
Flowable<Integer> rxRoomSourceFlowable =
Flowable.fromArray(1, 2, 3)
.subscribeOn(Schedulers.io())
.share();
rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));