为什么并行执行不会发生在多个 RXJava Observable 上?

Why Parallel execution not happening for multiple RXJava Observables?

我是 RxJava 的新手,正在尝试为来自 link 的多个 Observable 执行一个并行执行示例: RxJava Fetching Observables In Parallel

虽然上面提供的例子link是并行执行observables,但是当我在forEach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)然后系统开始执行一个一次可观。请帮助我理解为什么 Thread.sleep 正在停止 Observables 的并行执行。

下面是导致多个可观察对象同步执行的修改示例:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
        .forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){}; 
        System.out.println(x + " " + Thread.currentThread().getId());});
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(i);
            s.onCompleted();
        });
    }
}

在上面的例子中,我们使用了 observable 的 subscribeOn 方法并提供了一个 ThreadPool(Schedules.io) 来执行,所以每个 Observable 的订阅将发生在单独的线程上。

有可能 Thread.sleep 正在锁定线程之间的任何共享对象,但我仍然不清楚。请帮忙

实际上,您的示例并行执行正在发生,您只是看错了,执行工作的位置和发出通知的位置之间存在差异。

如果你将线程 id 放在 Observable.create 的日志中,你会注意到每个 Observable 在不同的线程同时执行。但通知是连续发生的。作为 Observable 契约的一部分,这种行为是预期的,因为 Observable 必须串行(而非并行)向观察者发出通知。