保持 X 个请求 运行 并行,直到 N 个请求完成

Keep X requests running in parallel until N requests completed

我无法找到适合给定用例的结构。

我有一个函数,可以为给定的输入字符串生成 Single<String>。这个 Single 是多个任务的组合,预计最多需要半分钟,但我认为这与问题无关。

我试图实现的行为是让其中 5 个任务同时处于活动状态,同时完成 N 个任务。由于这些任务不仅在时间上很昂贵,因此不应启动会在以后丢弃的多余任务。以某种方式合并这些任务的结果并不重要,我只是将每个结果保存在 onSuccess 中。

所以基本上我的计划是有一个从 0 开始的计数器变量。然后,在一个循环中,生成 5 Single,将计数器增加 5。订阅它们,onComplete 增加计数器和 onTerminate 如果 counter < N,开始另一个 Single

然而,最后,我缺乏一个很好的方法来跟踪递归生成的Single个对象,但我需要等到N个完成。

是否有可用的构造可以使这变得容易,或者我应该尝试使用计数器或 Disposable 的列表来实现这一点?

flatMap() 运算符有一个附加参数,即它维护的同时订阅数。这是一个测试样本:

@Test
public void testStuff() throws Exception {
    Single<String> source;
    PublishSubject<String> src = PublishSubject.create();
    src
            .flatMap( s -> doStuff( s ).toObservable(), 5 )
            .subscribe( v -> { // process v
            } );
}

Single<String> doStuff( String subject ) {
    return Single.just( subject );
}

如果 doStuff() returns 一个 Single,你必须将它转换成一个 Observable,因为那是 flatMap() 的工作方式。