如何在 RxJava merge() 中限制活动流的数量?
How to limit the number of active streams in RxJava merge()?
我正在寻找一种将多个事件流与 merge() 运算符组合在一起的方法,并且数量有限 'active streams'。例如,当我合并 6 个流(它们都在不同的线程上工作)时,merge() 将订阅所有这些流,但我需要它首先订阅 3 个,当其中一个完成时应该订阅另一个直到所有 6 个流都完成。是否可以通过 merge 或任何其他 RxJava 运算符实现此目的,还是我应该自己编写?
您可以在合并运算符中使用值 maxConcurrency,但您必须将所有 Observable 包装在一个中。
@Test
public void testMergeMaxConcurrency() {
Observable.merge(Observable.just(
Observable.just(3),
Observable.just(5),
Observable.just(1),
Observable.just(4),
Observable.just(2)), 2)
.collect(ArrayList<Integer>::new, ArrayList::add)
.doOnNext(Collections::sort)
.subscribe(System.out::println);
}
我正在寻找一种将多个事件流与 merge() 运算符组合在一起的方法,并且数量有限 'active streams'。例如,当我合并 6 个流(它们都在不同的线程上工作)时,merge() 将订阅所有这些流,但我需要它首先订阅 3 个,当其中一个完成时应该订阅另一个直到所有 6 个流都完成。是否可以通过 merge 或任何其他 RxJava 运算符实现此目的,还是我应该自己编写?
您可以在合并运算符中使用值 maxConcurrency,但您必须将所有 Observable 包装在一个中。
@Test
public void testMergeMaxConcurrency() {
Observable.merge(Observable.just(
Observable.just(3),
Observable.just(5),
Observable.just(1),
Observable.just(4),
Observable.just(2)), 2)
.collect(ArrayList<Integer>::new, ArrayList::add)
.doOnNext(Collections::sort)
.subscribe(System.out::println);
}