如何使多线程观察者计算率适应冷 Observable<List<T>>
How to adapt multithreaded observer-computation-rate to a cold Observable<List<T>>
我有一个冷源 Observable<List<T>>
以块(列表)的形式发射元素,我想在单独的线程中处理块中的每个项目,而发射器(源)正在等待终止处理发出的块中的所有项目以继续处理下一个,依此类推。
此代码 (rxjava 2.0.6) 执行这些操作但仅在一个线程中执行。如果我想用 observeOn(Schedulers.io)
在许多线程中分叉观察者计算,源线程会继续发出所有内容直到完成并且不会被每个块阻塞。
Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
-> {
for (int i = 0; i < 1000; i++) {
List<T> chunk = produceChunkOf(10);
e.onNext(chunk);
}
e.onComplete();
});
lazy_source
.subscribeOn(Schedulers.io())
.flatMap(chunk ->
Observable.fromIterable(chunk)
// .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
.doOnNext(item -> consume(item))
, 10) // Number of concurent Threads
.subscribe();
如有任何帮助,我将不胜感激。
这样的事情怎么样:
Observable.range(0, 1000)
.concatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return produceChunkOf(10)
.flatMap(new Func1<Object, Observable<?>>() {
@Override
public Observable<?> call(Object item) {
return consume(item)
.observeOn(Schedulers.io());
}
}, 10)
.toList();
}
});
首先,您创建一个 Observable
将输入发送到 produceChunkOf
,然后对于每个输入项,您 concatMap
根据您对每个块的顺序执行的要求,对于您生成块的每个输入,并与 flatMap
并行处理它,然后在使用 toList()
处理所有项目后收集它
这是最终版本(没有开销):
Observable.range(0, 1_000_000)
.subscribeOn(Schedulers.io())
.concatMap(i -> produceChunkOf(100) // this returs an Observable of 100 items
.flatMap(item -> Observable
.just(item)
.observeOn(Schedulers.io())
.doOnNext(element -> consume(element)),
50)) // Number of concurent Threads
.subscribe();
我有一个冷源 Observable<List<T>>
以块(列表)的形式发射元素,我想在单独的线程中处理块中的每个项目,而发射器(源)正在等待终止处理发出的块中的所有项目以继续处理下一个,依此类推。
此代码 (rxjava 2.0.6) 执行这些操作但仅在一个线程中执行。如果我想用 observeOn(Schedulers.io)
在许多线程中分叉观察者计算,源线程会继续发出所有内容直到完成并且不会被每个块阻塞。
Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
-> {
for (int i = 0; i < 1000; i++) {
List<T> chunk = produceChunkOf(10);
e.onNext(chunk);
}
e.onComplete();
});
lazy_source
.subscribeOn(Schedulers.io())
.flatMap(chunk ->
Observable.fromIterable(chunk)
// .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
.doOnNext(item -> consume(item))
, 10) // Number of concurent Threads
.subscribe();
如有任何帮助,我将不胜感激。
这样的事情怎么样:
Observable.range(0, 1000)
.concatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
return produceChunkOf(10)
.flatMap(new Func1<Object, Observable<?>>() {
@Override
public Observable<?> call(Object item) {
return consume(item)
.observeOn(Schedulers.io());
}
}, 10)
.toList();
}
});
首先,您创建一个 Observable
将输入发送到 produceChunkOf
,然后对于每个输入项,您 concatMap
根据您对每个块的顺序执行的要求,对于您生成块的每个输入,并与 flatMap
并行处理它,然后在使用 toList()
这是最终版本(没有开销):
Observable.range(0, 1_000_000)
.subscribeOn(Schedulers.io())
.concatMap(i -> produceChunkOf(100) // this returs an Observable of 100 items
.flatMap(item -> Observable
.just(item)
.observeOn(Schedulers.io())
.doOnNext(element -> consume(element)),
50)) // Number of concurent Threads
.subscribe();