我们可以在调度程序上 运行 的 Observable 数量是否有限制?

Is there a limit on number of Observables we can run on Schedulers?

我正在尝试查看是否可以在 io()computation() 调度器上生成 100 万个 Observables

 public static void observableLimit() {
        sum = 0;
        long lowerBound = 0;
        long higherBound = 1000;
        Flowable.fromCallable(() -> {
            Flowable.rangeLong(lowerBound, higherBound + 1)
                    .subscribe(integer -> Observable.just(integer)
                            .subscribeOn(Schedulers.io())
                            .subscribe(j -> {
                                printNum(j);
                                sum = sum + j;
                            }));
            return true;
        }).blockingSubscribe(aBoolean -> {
            long actualSum = (higherBound * (higherBound + 1)) / 2;
            System.out.println("");
            System.out.println("SUM: " + sum);
            Assert.assertEquals(actualSum, sum);
        });
    }

对于higherBound = 100,它大部分时间都有效,对于1000,它有时有效但大部分时间都失败,对于10000 它几乎每次都会失败,如果我在 newThread() 上告诉它 运行 它并且如果我根本不使用 subscribeOn() 它就会起作用。

我该如何解决这个问题?

How can I fix this behaviour?

不要使用那个模式。你为什么要这样做?

ionewThread 创建 OS 个线程,并且从根本上受到您的 OS 能力和可用内存的限制。

computation 有一组固定的线程,可以处理更多的 Flowable,因为它们被分配给一个现有的工作线程。

您面临的问题不是 Observables 的某些限制,而是您的代码问题。你 blockingSubscribe 到一个与跨越所有其他线程的 Flowable 没有关系的 Flowable。对于 higherBound 的小值,您会看到代码有效,而对于大值则不起作用,这是因为对于小的 higherBound ,外部 Flowable 可能与内部 Flowable 一样快,但对于高值则崩溃得更快higherBound.

的值

我想说的是,为了看到正确的结果,您需要与跨越所有其他线程而不是外部线程的 Flowable 同步。我还将用线程安全实现 LongAdder sum 替换 long sum,您可以使用 flatMap 运算符实现此目的。

Flowable.rangeLong(lowerBound, higherBound + 1)
         .flatMap(t -> Flowable.just(t)
                 .subscribeOn(Schedulers.io())
         )
         .doOnNext(sum::add)
         .doOnComplete(() -> {
             long actualSum = (higherBound * (higherBound + 1)) / 2;
             log("SUM: " + sum.longValue() + ", ACTUAL: " + actualSum);
             log("Equals: " + (actualSum == sum.longValue()));
         })
         .blockingSubscribe();