Flowable.map() 中的长 运行 操作会阻止所有订阅者

Long running operation in Flowable.map() blocks all subscribers

在我的 android 代码中,两个视图需要来自数据库的一些数据同时作为 io.reactivex.Flowable 提供。一个视图需要纯数据,但另一个视图应用涉及一些 IO 并需要一些时间的映射函数。

问题是两个订阅者只有在映射函数终止后才能获取数据。这会导致其中一个视图在加载另一个视图时为空白。

Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();

rxRoomSourceFlowable
    .map(integer -> {
        // Some long running operation
        Thread.sleep(5000);
        return integer + "!";
    })
    .subscribeOn(Schedulers.io())
    .subscribe(str -> System.out.println("Mapped subscriber: " + str));

rxRoomSourceFlowable
    .subscribeOn(Schedulers.io())
    .subscribe(integer -> System.out.println("Plain subscriber: " + integer));

输出:

Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3

如您所见,只有在长 运行 映射函数终止后,普通订阅者才会收到通知,即使结果完全不相关。调用 subscribe() 的顺序无关紧要,因为无论如何订阅都是异步发生的。

如何确保普通订阅者在数据可用时立即得到通知,而无需等待映射功能?与使用 map() 相比,更改 Flowable 中的数据是否有不同的选项?

share() 以锁步方式调度事件,因此,如果一个订阅者延迟,另一个订阅者将无法及时获得项目。您必须将有问题的订阅者移到另一个线程并让 share progress:

Flowable<Integer> rxRoomSourceFlowable = 
    Flowable.fromArray(1, 2, 3)
    .subscribeOn(Schedulers.io())
    .share();

rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
    // Some long running operation
    Thread.sleep(5000);
    return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));

rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));