我们可以在调度程序上 运行 的 Observable 数量是否有限制?
Is there a limit on number of Observables we can run on Schedulers?
我正在尝试查看是否可以在 io()
和 computation()
调度器上生成 100 万个 Observables
。
public static void observableLimit() {
sum = 0;
long lowerBound = 0;
long higherBound = 1000;
Flowable.fromCallable(() -> {
Flowable.rangeLong(lowerBound, higherBound + 1)
.subscribe(integer -> Observable.just(integer)
.subscribeOn(Schedulers.io())
.subscribe(j -> {
printNum(j);
sum = sum + j;
}));
return true;
}).blockingSubscribe(aBoolean -> {
long actualSum = (higherBound * (higherBound + 1)) / 2;
System.out.println("");
System.out.println("SUM: " + sum);
Assert.assertEquals(actualSum, sum);
});
}
对于higherBound = 100
,它大部分时间都有效,对于1000,它有时有效但大部分时间都失败,对于10000 它几乎每次都会失败,如果我在 newThread()
上告诉它 运行 它并且如果我根本不使用 subscribeOn()
它就会起作用。
我该如何解决这个问题?
How can I fix this behaviour?
不要使用那个模式。你为什么要这样做?
io
和 newThread
创建 OS 个线程,并且从根本上受到您的 OS 能力和可用内存的限制。
computation
有一组固定的线程,可以处理更多的 Flowable
,因为它们被分配给一个现有的工作线程。
您面临的问题不是 Observables 的某些限制,而是您的代码问题。你 blockingSubscribe
到一个与跨越所有其他线程的 Flowable 没有关系的 Flowable。对于 higherBound
的小值,您会看到代码有效,而对于大值则不起作用,这是因为对于小的 higherBound
,外部 Flowable 可能与内部 Flowable 一样快,但对于高值则崩溃得更快higherBound
.
的值
我想说的是,为了看到正确的结果,您需要与跨越所有其他线程而不是外部线程的 Flowable 同步。我还将用线程安全实现 LongAdder sum
替换 long sum
,您可以使用 flatMap
运算符实现此目的。
Flowable.rangeLong(lowerBound, higherBound + 1)
.flatMap(t -> Flowable.just(t)
.subscribeOn(Schedulers.io())
)
.doOnNext(sum::add)
.doOnComplete(() -> {
long actualSum = (higherBound * (higherBound + 1)) / 2;
log("SUM: " + sum.longValue() + ", ACTUAL: " + actualSum);
log("Equals: " + (actualSum == sum.longValue()));
})
.blockingSubscribe();
我正在尝试查看是否可以在 io()
和 computation()
调度器上生成 100 万个 Observables
。
public static void observableLimit() {
sum = 0;
long lowerBound = 0;
long higherBound = 1000;
Flowable.fromCallable(() -> {
Flowable.rangeLong(lowerBound, higherBound + 1)
.subscribe(integer -> Observable.just(integer)
.subscribeOn(Schedulers.io())
.subscribe(j -> {
printNum(j);
sum = sum + j;
}));
return true;
}).blockingSubscribe(aBoolean -> {
long actualSum = (higherBound * (higherBound + 1)) / 2;
System.out.println("");
System.out.println("SUM: " + sum);
Assert.assertEquals(actualSum, sum);
});
}
对于higherBound = 100
,它大部分时间都有效,对于1000,它有时有效但大部分时间都失败,对于10000 它几乎每次都会失败,如果我在 newThread()
上告诉它 运行 它并且如果我根本不使用 subscribeOn()
它就会起作用。
我该如何解决这个问题?
How can I fix this behaviour?
不要使用那个模式。你为什么要这样做?
io
和 newThread
创建 OS 个线程,并且从根本上受到您的 OS 能力和可用内存的限制。
computation
有一组固定的线程,可以处理更多的 Flowable
,因为它们被分配给一个现有的工作线程。
您面临的问题不是 Observables 的某些限制,而是您的代码问题。你 blockingSubscribe
到一个与跨越所有其他线程的 Flowable 没有关系的 Flowable。对于 higherBound
的小值,您会看到代码有效,而对于大值则不起作用,这是因为对于小的 higherBound
,外部 Flowable 可能与内部 Flowable 一样快,但对于高值则崩溃得更快higherBound
.
我想说的是,为了看到正确的结果,您需要与跨越所有其他线程而不是外部线程的 Flowable 同步。我还将用线程安全实现 LongAdder sum
替换 long sum
,您可以使用 flatMap
运算符实现此目的。
Flowable.rangeLong(lowerBound, higherBound + 1)
.flatMap(t -> Flowable.just(t)
.subscribeOn(Schedulers.io())
)
.doOnNext(sum::add)
.doOnComplete(() -> {
long actualSum = (higherBound * (higherBound + 1)) / 2;
log("SUM: " + sum.longValue() + ", ACTUAL: " + actualSum);
log("Equals: " + (actualSum == sum.longValue()));
})
.blockingSubscribe();