Observables 较少自以为是?

Observables Are Less Opinionated?

ReactiveX Introduction - Observables Are Less Opinionated

中所述

ReactiveX is not biased toward some particular source of concurrency or asynchronicity. Observables can be implemented using thread-pools, event loops, non-blocking I/O, actors (such as from Akka), or whatever implementation suits your needs, your style, or your expertise. Client code treats all of its interactions with Observables as asynchronous, whether your underlying implementation is blocking or non-blocking and however you choose to implement it.

我不明白这部分 - “你的底层实现是阻塞还是非阻塞”。

你能详细解释一下吗?或者一些示例代码来解释这个?

Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())
    .subscribe(data -> {
        showTheDataOnTheUI(data);
    });

您认为 doSomeReallyLongNetworkRequest() 会在哪里 运行(线程方面)?
好吧,如果你将 运行 这段代码放在主线程,网络调用将 运行 在主线程!

"Observables Are Less Opinionated" 表示多线程是从实际工作中抽象出来的。订阅者不知道(也不需要)Observable 将 运行,它可以在线程池、事件循环上 运行,甚至可以 运行 以阻塞方式。
这就是为什么所有 Observable 交互都使用异步 API 进行的原因。

虽然这样说似乎是一个缺点,但事实恰恰相反,这意味着您可以更好地控制代码的每个部分 运行,而无需暴露操作本身和做出反应的代码Observable 传播这些知识。

这是使用 RxJava 中的 Schedulers 机制以及 subscribeOn()/observeOn() 运算符完成的:

Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(data -> {
        showTheDataOnTheUI(data);
    });

现在你告诉 Observable 在 IO Schdeuler 上执行订阅代码 (doSomeReallyLongNetworkRequest()) 到 运行,这将为网络请求创建一个专用线程,在另一个在这方面,您告诉 Observable 在主线程通知有关排放量 (onNext()) Subscriber (showTheDataOnTheUI(data))(对于 android 细节感到抱歉)。

通过这种方法,您将拥有非常强大的机制来确定这两个操作将在何处以及如何工作以及将在何处触发通知,并且非常容易在不同线程之间进行乒乓球,这种强大的功能来自于异步 API,加上线程抽象到专用运算符和调度程序机制。

更新:进一步说明:

Client code treats all of its interactions with Observables as asynchronous

此处的客户端代码是指与 Observable 交互的任何代码,简单的示例是 Subscriber,它是 Observable 的客户端,因为 Observable 的可组合性质=14=],你可以从一些 API 得到一个 Observable 而不知道它是如何运作的:

Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())

可以用某些服务 API 封装为 Observable<Data>,并且当订阅者与其交互时,使用 Observable 事件 onNext、onError、onComplete 以异步方式发生。

whether your underlying implementation is blocking or non-blocking and however you choose to implement it.

"underlying implementation" 指的是 Observable 做的操作,它可以像我的网络调用示例那样阻塞工作,但它也可以是来自 UI 的通知(点击事件),或者更新发生在一些外部模块,所有这些都是非阻塞实现,并且再次因为它的异步 API 性质 Subscribe 不应该关心,它只需要对通知做出反应而不用担心在哪里以及实现(Observable body)将如何行动。