RxJava 调度器——线程行为和饥饿?
RxJava Schedulers- Thread Behavior and Starvation?
我对 RxJava 的 observeOn()
和 subscribeOn()
有双重看法。我知道他们不会在单个流上并行排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解也是您必须 flatMap()
一个调度程序,如 .flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()))
中那样,以并行化单个流上的排放。
此外,如果是这种情况,那么调度程序是否会发生线程饥饿?如果我的计算调度程序有 5 个线程,但我有超过 5 个 long-运行 异步流正在处理,是否有可能发生饥饿?或者这不太可能只是因为 RxJava 的性质?
public class Test {
public static void main(String[] args) {
Observable<String> airports = Observable.just("ABQ", "HOU",
"PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub1 " + s +
" " + Thread.currentThread().getName()));
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub2 " + s +
" " + Thread.currentThread().getName()));
sleep();
}
private static String stall(String str) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
private static void sleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
对于 flatMap
,一个异步源可能被其他异步源淹没,它无法在自己的源上取得进展。然而,在实践中,由于 OS 和 JVM 打嗝提供了足够的喘息空间,以及 flatMap
本身的背压和仲裁,我还没有看到这种情况发生。如果您担心这种压力过大,可以使用带有 flatMap
重载的 maxConcurrent
参数并限制并发订阅数。
RxJava 主要是以非阻塞方式编写的,因此当需要合并或组合源时,它们实际上并不相互等待。
计算调度器是一个单线程执行器池,并以循环方式分配给调用者。我不知道标准执行者的公平性。
我对 RxJava 的 observeOn()
和 subscribeOn()
有双重看法。我知道他们不会在单个流上并行排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解也是您必须 flatMap()
一个调度程序,如 .flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation()))
中那样,以并行化单个流上的排放。
此外,如果是这种情况,那么调度程序是否会发生线程饥饿?如果我的计算调度程序有 5 个线程,但我有超过 5 个 long-运行 异步流正在处理,是否有可能发生饥饿?或者这不太可能只是因为 RxJava 的性质?
public class Test {
public static void main(String[] args) {
Observable<String> airports = Observable.just("ABQ", "HOU",
"PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub1 " + s +
" " + Thread.currentThread().getName()));
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub2 " + s +
" " + Thread.currentThread().getName()));
sleep();
}
private static String stall(String str) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
private static void sleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
对于 flatMap
,一个异步源可能被其他异步源淹没,它无法在自己的源上取得进展。然而,在实践中,由于 OS 和 JVM 打嗝提供了足够的喘息空间,以及 flatMap
本身的背压和仲裁,我还没有看到这种情况发生。如果您担心这种压力过大,可以使用带有 flatMap
重载的 maxConcurrent
参数并限制并发订阅数。
RxJava 主要是以非阻塞方式编写的,因此当需要合并或组合源时,它们实际上并不相互等待。
计算调度器是一个单线程执行器池,并以循环方式分配给调用者。我不知道标准执行者的公平性。