如何使多线程观察者计算率适应冷 Observable<List<T>>

How to adapt multithreaded observer-computation-rate to a cold Observable<List<T>>

我有一个冷源 Observable<List<T>> 以块(列表)的形式发射元素,我想在单独的线程中处理块中的每个项目,而发射器(源)正在等待终止处理发出的块中的所有项目以继续处理下一个,依此类推。

此代码 (rxjava 2.0.6) 执行这些操作但仅在一个线程中执行。如果我想用 observeOn(Schedulers.io) 在许多线程中分叉观察者计算,源线程会继续发出所有内容直到完成并且不会被每个块阻塞。

Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
        -> {
    for (int i = 0; i < 1000; i++) {
        List<T> chunk = produceChunkOf(10);
        e.onNext(chunk);
    }

    e.onComplete();
});    
lazy_source
        .subscribeOn(Schedulers.io())
        .flatMap(chunk -> 
                Observable.fromIterable(chunk)
                    // .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
                    .doOnNext(item -> consume(item))
                , 10) // Number of concurent Threads
        .subscribe();

如有任何帮助,我将不胜感激。

这样的事情怎么样:

 Observable.range(0, 1000)
            .concatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer integer) {
                    return produceChunkOf(10)
                            .flatMap(new Func1<Object, Observable<?>>() {
                                @Override
                                public Observable<?> call(Object item) {
                                    return consume(item)
                                            .observeOn(Schedulers.io());
                                }
                            }, 10)
                            .toList();
                }
            });

首先,您创建一个 Observable 将输入发送到 produceChunkOf,然后对于每个输入项,您 concatMap 根据您对每个块的顺序执行的要求,对于您生成块的每个输入,并与 flatMap 并行处理它,然后在使用 toList()

处理所有项目后收集它

这是最终版本(没有开销):

Observable.range(0, 1_000_000)
        .subscribeOn(Schedulers.io())
        .concatMap(i -> produceChunkOf(100) // this returs an Observable of 100 items
                .flatMap(item -> Observable
                        .just(item)
                        .observeOn(Schedulers.io())
                        .doOnNext(element -> consume(element)), 
                        50)) // Number of concurent Threads
        .subscribe();