按并发任务数(不是基于时间的)限制异步 Mono
Throttle asynchronous Mono by number of concurrent tasks (not time-based)
假设我有一个方法,它接受一个参数和 returns 一个异步完成的 Mono<Integer>
。例如:
Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
Mono<Integer> fetch(String a) {
return Mono.create(em -> {
scheduledExecutorService.schedule(() -> em.next(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS);
});
}
假设我有一个 Flux<String>
可以输入上面的 fetch
方法并且它可以有很多元素。
有没有一种方法可以确保并行调用该方法,但将并发调用的数量限制为预定义的数量?
例如4 在上面的例子中,虽然我有 16 个可用线程 - 所以从这个角度来看我总是保留 12 个备用。
假设 "feed into",你的意思是你正在使用 flux.flatMap(this::fetch)
,
那么您可以通过调用 flux.flatMap(this::fetch, 4)
来设置 flatMap 并发性。
此外,您的代码有两个编译错误:
- return 类型的提取
Mono<Integer>
与您提供给接收器的项目类型不匹配 (a + " result"
)。我假设你的意思是 Mono<String>
- MonoSink 没有
.next
方法。我假设你的意思是 .success
考虑到所有这些,这里有一个例子:
private Flux<String> fetchAll() {
return Flux.range(0, 50)
.map(i -> Integer.toString(i))
.flatMap(this::fetch, 4);
}
private Mono<String> fetch(String a) {
return Mono.create(em ->
scheduledExecutorService.schedule(() -> em.success(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS)
);
}
假设我有一个方法,它接受一个参数和 returns 一个异步完成的 Mono<Integer>
。例如:
Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
Mono<Integer> fetch(String a) {
return Mono.create(em -> {
scheduledExecutorService.schedule(() -> em.next(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS);
});
}
假设我有一个 Flux<String>
可以输入上面的 fetch
方法并且它可以有很多元素。
有没有一种方法可以确保并行调用该方法,但将并发调用的数量限制为预定义的数量?
例如4 在上面的例子中,虽然我有 16 个可用线程 - 所以从这个角度来看我总是保留 12 个备用。
假设 "feed into",你的意思是你正在使用 flux.flatMap(this::fetch)
,
那么您可以通过调用 flux.flatMap(this::fetch, 4)
来设置 flatMap 并发性。
此外,您的代码有两个编译错误:
- return 类型的提取
Mono<Integer>
与您提供给接收器的项目类型不匹配 (a + " result"
)。我假设你的意思是Mono<String>
- MonoSink 没有
.next
方法。我假设你的意思是.success
考虑到所有这些,这里有一个例子:
private Flux<String> fetchAll() {
return Flux.range(0, 50)
.map(i -> Integer.toString(i))
.flatMap(this::fetch, 4);
}
private Mono<String> fetch(String a) {
return Mono.create(em ->
scheduledExecutorService.schedule(() -> em.success(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS)
);
}