observeOn 的放置给了我不同的行为

Placement of observeOn gives me different behavior

当我 运行 以下片段时,我没有看到背压。

public static void main(String[] args) throws InterruptedException {

    MyFileProcessor pro = new MyFileProcessor();
    Timer t = new Timer();
    t.start();
    Disposable x = pro
            .generateFlowable(
                    new File("path\to\file.raw"))
            .subscribeOn(Schedulers.io(), false).observeOn(Schedulers.io()).map(y -> {
                System.out.println(Thread.currentThread().getName() + " xxx");

                return y;
            })

            .subscribe(onNext -> {
                System.out.println(Thread.currentThread().getName() + " " + new String(onNext));
                Thread.sleep(100);
            }, Throwable::printStackTrace, () -> {
                System.out.println("Done");
                t.end();
                System.out.println(t.getTotalTime());
            });

    Thread.sleep(1000000000);

}  

当我 运行 上面的 class 时,我得到

的交替行

RxCachedThreadScheduler-1 xxx
RxCachedThreadScheduler-1 Line1
....

它使用相同的线程。

现在,当我将 observeOn 移动到订阅之前时,我看到了一堆 RxCachedThreadScheduler-1 xxx
后面跟着一堆 RxCachedThreadScheduler-1 Line1

我假设这是背压,但使用的线程仍然相同。

为什么我会看到这种行为?
为什么只使用了一个线程?
没有可供 observeOn 运行的运算符,那么为什么我会看到这种行为?

[编辑]

public Flowable<byte[]> generateFlowable(File file) {
    return Flowable.generate(() -> new BufferedInputStream(new FileInputStream(file)), (bufferedIs, output) -> {
        try {
            byte[] data = getMessageRawData(bufferedIs);
            if (data != null)
                output.onNext(data);
            else
                output.onComplete();

        }
        catch (Exception e) {
            output.onError(e);
        }
        return bufferedIs;
    }, bufferedIs -> {
        try {
            bufferedIs.close();
        }
        catch (IOException ex) {
            RxJavaPlugins.onError(ex);
        }
    });

}  

Why is only one thread being utilized?

工作正常,因为您检查了 运行 线程 after observeOn,因此您应该在此处和下方看到相同的线程,无论如何发生在它上面。 subscribeOn 影响 generateFlowable,我想,您不会打印当前线程,因此您看不到它在不同的 IO 线程上运行。

Now when I move the observeOn to just before the subscribe

除非 generateFlowable.

中发生奇怪的事情,否则应该没有任何区别