RxJava 调度器——线程行为和饥饿?

RxJava Schedulers- Thread Behavior and Starvation?

我对 RxJava 的 observeOn()subscribeOn() 有双重看法。我知道他们不会在单个流上并行排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解也是您必须 flatMap() 一个调度程序,如 .flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation())) 中那样,以并行化单个流上的排放。

此外,如果是这种情况,那么调度程序是否会发生线程饥饿?如果我的计算调度程序有 5 个线程,但我有超过 5 个 long-运行 异步流正在处理,是否有可能发生饥饿?或者这不太可能只是因为 RxJava 的性质?

public class Test {
    public static void main(String[] args) {


        Observable<String> airports = Observable.just("ABQ", "HOU", 
            "PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");


        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub1 " + s +
                " " + Thread.currentThread().getName()));

        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub2 " + s +
                " " + Thread.currentThread().getName()));

        sleep();
    }

    private static String stall(String str) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return str;
    }

    private static void sleep() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

对于 flatMap,一个异步源可能被其他异步源淹没,它无法在自己的源上取得进展。然而,在实践中,由于 OS 和 JVM 打嗝提供了足够的喘息空间,以及 flatMap 本身的背压和仲裁,我还没有看到这种情况发生。如果您担心这种压力过大,可以使用带有 flatMap 重载的 maxConcurrent 参数并限制并发订阅数。

RxJava 主要是以非阻塞方式编写的,因此当需要合并或组合源时,它们实际上并不相互等待。

计算调度器是一个单线程执行器池,并以循环方式分配给调用者。我不知道标准执行者的公平性。