为什么并行执行不会发生在多个 RXJava Observable 上?
Why Parallel execution not happening for multiple RXJava Observables?
我是 RxJava 的新手,正在尝试为来自 link 的多个 Observable 执行一个并行执行示例:
RxJava Fetching Observables In Parallel
虽然上面提供的例子link是并行执行observables,但是当我在forEach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)然后系统开始执行一个一次可观。请帮助我理解为什么 Thread.sleep 正在停止 Observables 的并行执行。
下面是导致多个可观察对象同步执行的修改示例:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
在上面的例子中,我们使用了 observable 的 subscribeOn 方法并提供了一个 ThreadPool(Schedules.io) 来执行,所以每个 Observable 的订阅将发生在单独的线程上。
有可能 Thread.sleep 正在锁定线程之间的任何共享对象,但我仍然不清楚。请帮忙
实际上,您的示例并行执行正在发生,您只是看错了,执行工作的位置和发出通知的位置之间存在差异。
如果你将线程 id 放在 Observable.create
的日志中,你会注意到每个 Observable 在不同的线程同时执行。但通知是连续发生的。作为 Observable 契约的一部分,这种行为是预期的,因为 Observable 必须串行(而非并行)向观察者发出通知。
我是 RxJava 的新手,正在尝试为来自 link 的多个 Observable 执行一个并行执行示例: RxJava Fetching Observables In Parallel
虽然上面提供的例子link是并行执行observables,但是当我在forEach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)然后系统开始执行一个一次可观。请帮助我理解为什么 Thread.sleep 正在停止 Observables 的并行执行。
下面是导致多个可观察对象同步执行的修改示例:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
在上面的例子中,我们使用了 observable 的 subscribeOn 方法并提供了一个 ThreadPool(Schedules.io) 来执行,所以每个 Observable 的订阅将发生在单独的线程上。
有可能 Thread.sleep 正在锁定线程之间的任何共享对象,但我仍然不清楚。请帮忙
实际上,您的示例并行执行正在发生,您只是看错了,执行工作的位置和发出通知的位置之间存在差异。
如果你将线程 id 放在 Observable.create
的日志中,你会注意到每个 Observable 在不同的线程同时执行。但通知是连续发生的。作为 Observable 契约的一部分,这种行为是预期的,因为 Observable 必须串行(而非并行)向观察者发出通知。