保持 X 个请求 运行 并行,直到 N 个请求完成
Keep X requests running in parallel until N requests completed
我无法找到适合给定用例的结构。
我有一个函数,可以为给定的输入字符串生成 Single<String>
。这个 Single
是多个任务的组合,预计最多需要半分钟,但我认为这与问题无关。
我试图实现的行为是让其中 5 个任务同时处于活动状态,同时完成 N 个任务。由于这些任务不仅在时间上很昂贵,因此不应启动会在以后丢弃的多余任务。以某种方式合并这些任务的结果并不重要,我只是将每个结果保存在 onSuccess
中。
所以基本上我的计划是有一个从 0 开始的计数器变量。然后,在一个循环中,生成 5 Single
,将计数器增加 5。订阅它们,onComplete
增加计数器和 onTerminate
如果 counter
< N,开始另一个 Single
。
然而,最后,我缺乏一个很好的方法来跟踪递归生成的Single
个对象,但我需要等到N个完成。
是否有可用的构造可以使这变得容易,或者我应该尝试使用计数器或 Disposable
的列表来实现这一点?
flatMap()
运算符有一个附加参数,即它维护的同时订阅数。这是一个测试样本:
@Test
public void testStuff() throws Exception {
Single<String> source;
PublishSubject<String> src = PublishSubject.create();
src
.flatMap( s -> doStuff( s ).toObservable(), 5 )
.subscribe( v -> { // process v
} );
}
Single<String> doStuff( String subject ) {
return Single.just( subject );
}
如果 doStuff()
returns 一个 Single
,你必须将它转换成一个 Observable
,因为那是 flatMap()
的工作方式。
我无法找到适合给定用例的结构。
我有一个函数,可以为给定的输入字符串生成 Single<String>
。这个 Single
是多个任务的组合,预计最多需要半分钟,但我认为这与问题无关。
我试图实现的行为是让其中 5 个任务同时处于活动状态,同时完成 N 个任务。由于这些任务不仅在时间上很昂贵,因此不应启动会在以后丢弃的多余任务。以某种方式合并这些任务的结果并不重要,我只是将每个结果保存在 onSuccess
中。
所以基本上我的计划是有一个从 0 开始的计数器变量。然后,在一个循环中,生成 5 Single
,将计数器增加 5。订阅它们,onComplete
增加计数器和 onTerminate
如果 counter
< N,开始另一个 Single
。
然而,最后,我缺乏一个很好的方法来跟踪递归生成的Single
个对象,但我需要等到N个完成。
是否有可用的构造可以使这变得容易,或者我应该尝试使用计数器或 Disposable
的列表来实现这一点?
flatMap()
运算符有一个附加参数,即它维护的同时订阅数。这是一个测试样本:
@Test
public void testStuff() throws Exception {
Single<String> source;
PublishSubject<String> src = PublishSubject.create();
src
.flatMap( s -> doStuff( s ).toObservable(), 5 )
.subscribe( v -> { // process v
} );
}
Single<String> doStuff( String subject ) {
return Single.just( subject );
}
如果 doStuff()
returns 一个 Single
,你必须将它转换成一个 Observable
,因为那是 flatMap()
的工作方式。