如何在 RxJava2 中并行执行消费者?
How to parallel execute consumer in RxJava2?
我有一个关于 RxJava2 的问题。我想运行固定线程池的不同线程中的消费者并行执行列表结果。这是我的代码:
List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size());
Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() {
@Override
public void accept(String data) throws Exception {
System.out.println(data + " forEach, thread is " + Thread.currentThread().getName());
}
});
我得到的结果是:
a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-1
c forEach, thread is pool-1-thread-1
d forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-1
f forEach, thread is pool-1-thread-1
g forEach, thread is pool-1-thread-1
但实际上我想要的是这样的结果,每个consumor在不同的线程中并行执行:
a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-2
c forEach, thread is pool-1-thread-3
d forEach, thread is pool-1-thread-4
e forEach, thread is pool-1-thread-5
f forEach, thread is pool-1-thread-6
g forEach, thread is pool-1-thread-7
有人可以告诉我如何实现吗?
为了读取并行线程中的项目,请使用 Flowable<> 而不是 Observable,因为它提供了 parallel 运算符。
例如:
Flowable.fromIterable(letters)
.parallel(letters.size())
.runOn(Schedulers.from(fixedThreadPool))
.sequential()
.forEach(data -> System.out.println(data + " forEach, thread is " +
Thread.currentThread().getName()));
由于您无法预测每次调用将使用哪个线程,因此输出可能会有所不同。在我的测试用例中,我得到了
c forEach, thread is pool-1-thread-3
g forEach, thread is pool-1-thread-7
a forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-5
d forEach, thread is pool-1-thread-4
b forEach, thread is pool-1-thread-2
f forEach, thread is pool-1-thread-6
我有一个关于 RxJava2 的问题。我想运行固定线程池的不同线程中的消费者并行执行列表结果。这是我的代码:
List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size());
Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() {
@Override
public void accept(String data) throws Exception {
System.out.println(data + " forEach, thread is " + Thread.currentThread().getName());
}
});
我得到的结果是:
a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-1
c forEach, thread is pool-1-thread-1
d forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-1
f forEach, thread is pool-1-thread-1
g forEach, thread is pool-1-thread-1
但实际上我想要的是这样的结果,每个consumor在不同的线程中并行执行:
a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-2
c forEach, thread is pool-1-thread-3
d forEach, thread is pool-1-thread-4
e forEach, thread is pool-1-thread-5
f forEach, thread is pool-1-thread-6
g forEach, thread is pool-1-thread-7
有人可以告诉我如何实现吗?
为了读取并行线程中的项目,请使用 Flowable<> 而不是 Observable,因为它提供了 parallel 运算符。 例如:
Flowable.fromIterable(letters)
.parallel(letters.size())
.runOn(Schedulers.from(fixedThreadPool))
.sequential()
.forEach(data -> System.out.println(data + " forEach, thread is " +
Thread.currentThread().getName()));
由于您无法预测每次调用将使用哪个线程,因此输出可能会有所不同。在我的测试用例中,我得到了
c forEach, thread is pool-1-thread-3
g forEach, thread is pool-1-thread-7
a forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-5
d forEach, thread is pool-1-thread-4
b forEach, thread is pool-1-thread-2
f forEach, thread is pool-1-thread-6