RxJava和观察者代码的并行执行
RxJava and parallel execution of observer code
我正在使用 RxJava Observable 编写以下代码 api :
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
我的期望是观察代码,即 subscribe() 方法中的代码将在我指定计算调度程序后并行执行。相反,代码仍在单线程上按顺序执行。如何使用 RxJava api.
并行编写代码 运行
RxJava 在谈到它的 asynchronous/multithreaded 方面时经常被误解。多线程操作的编码简单,理解抽象又是另一回事
关于 RxJava 的一个常见问题是如何实现并行化,或者从一个 Observable 并发发射多个项目。当然,这个定义打破了 Observable Contract,它规定 onNext() 必须按顺序调用,并且一次不能同时被多个线程调用。
要实现并行性,您需要多个 Observable。
这在单线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
为此,您必须指定 subscribeOn(Schedulers.computation())
而不是 observeOn(Schedulers.computation())
。
在 subscribeOn
中,您声明要在哪个线程中发出您的值。
在 observeOn
中声明要在哪个线程中处理并观察它们。
这仍然是相同的顺序。即使在新线程上
Observable<Integer> ob3 = Observable.range(1, 5);
ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer pArg0) {
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer pArg0) {
try {
Thread.sleep(1000 - (pArg0 * 100));
System.out.println(pArg0 + " ccc " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return pArg0;
}
}).subscribe();
输出
1 ccc RxNewThreadScheduler-1
2 ccc RxNewThreadScheduler-1
3 ccc RxNewThreadScheduler-1
4 ccc RxNewThreadScheduler-1
5 ccc RxNewThreadScheduler-1
使用flatMap
并指定订阅Schedulers.computation()
即可实现并发
这里有一个更实际的例子,使用Callable
,从输出中,我们可以看到大约需要2000毫秒才能完成所有任务。
static class MyCallable implements Callable<Integer> {
private static final Object CALLABLE_COUNT_LOCK = new Object();
private static int callableCount;
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount++;
}
}
public static int getCallableCount() {
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount;
}
}
}
private static void runMyCallableConcurrentlyWithRxJava() {
long startTimeMillis = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
.flatMap(new Function<MyCallable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext " + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
if (MyCallable.getCallableCount() >= 4) {
semaphore.release();
}
}
});
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}
RxJava 2.0.5 引入了parallel flows and ParallelFlowable,这使得并行执行更简单,更具声明性。
您不再需要在 flatMap
中创建 Observable
/Flowable
,您只需在 Flowable
上调用 parallel()
即可 returns ParallelFlowable
.
它不像常规 Flowable
那样功能丰富,因为并发会引发 Rx 契约的许多问题,但是您有基本的 map()
、filter()
等等,这应该是在大多数情况下足够了。
所以不是来自@LordRaydenMK 的这个流程回答:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
现在你可以做:
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));
我正在使用 RxJava Observable 编写以下代码 api :
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
我的期望是观察代码,即 subscribe() 方法中的代码将在我指定计算调度程序后并行执行。相反,代码仍在单线程上按顺序执行。如何使用 RxJava api.
并行编写代码 运行RxJava 在谈到它的 asynchronous/multithreaded 方面时经常被误解。多线程操作的编码简单,理解抽象又是另一回事
关于 RxJava 的一个常见问题是如何实现并行化,或者从一个 Observable 并发发射多个项目。当然,这个定义打破了 Observable Contract,它规定 onNext() 必须按顺序调用,并且一次不能同时被多个线程调用。
要实现并行性,您需要多个 Observable。
这在单线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
这在多个线程中运行:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
为此,您必须指定 subscribeOn(Schedulers.computation())
而不是 observeOn(Schedulers.computation())
。
在 subscribeOn
中,您声明要在哪个线程中发出您的值。
在 observeOn
中声明要在哪个线程中处理并观察它们。
这仍然是相同的顺序。即使在新线程上
Observable<Integer> ob3 = Observable.range(1, 5);
ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer pArg0) {
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer pArg0) {
try {
Thread.sleep(1000 - (pArg0 * 100));
System.out.println(pArg0 + " ccc " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return pArg0;
}
}).subscribe();
输出
1 ccc RxNewThreadScheduler-1
2 ccc RxNewThreadScheduler-1
3 ccc RxNewThreadScheduler-1
4 ccc RxNewThreadScheduler-1
5 ccc RxNewThreadScheduler-1
使用flatMap
并指定订阅Schedulers.computation()
即可实现并发
这里有一个更实际的例子,使用Callable
,从输出中,我们可以看到大约需要2000毫秒才能完成所有任务。
static class MyCallable implements Callable<Integer> {
private static final Object CALLABLE_COUNT_LOCK = new Object();
private static int callableCount;
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount++;
}
}
public static int getCallableCount() {
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount;
}
}
}
private static void runMyCallableConcurrentlyWithRxJava() {
long startTimeMillis = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
.flatMap(new Function<MyCallable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext " + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
if (MyCallable.getCallableCount() >= 4) {
semaphore.release();
}
}
});
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}
RxJava 2.0.5 引入了parallel flows and ParallelFlowable,这使得并行执行更简单,更具声明性。
您不再需要在 flatMap
中创建 Observable
/Flowable
,您只需在 Flowable
上调用 parallel()
即可 returns ParallelFlowable
.
它不像常规 Flowable
那样功能丰富,因为并发会引发 Rx 契约的许多问题,但是您有基本的 map()
、filter()
等等,这应该是在大多数情况下足够了。
所以不是来自@LordRaydenMK 的这个流程回答:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
现在你可以做:
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));