按并发任务数(不是基于时间的)限制异步 Mono

Throttle asynchronous Mono by number of concurrent tasks (not time-based)

假设我有一个方法,它接受一个参数和 returns 一个异步完成的 Mono<Integer>。例如:

Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);

Mono<Integer> fetch(String a) {
  return Mono.create(em -> {
    scheduledExecutorService.schedule(() -> em.next(a + " result"), 
      10 + random.nextInt(50), TimeUnit.MILLISECONDS);
  });
}

假设我有一个 Flux<String> 可以输入上面的 fetch 方法并且它可以有很多元素。

有没有一种方法可以确保并行调用该方法,但将并发调用的数量限制为预定义的数量?

例如4 在上面的例子中,虽然我有 16 个可用线程 - 所以从这个角度来看我总是保留 12 个备用。

假设 "feed into",你的意思是你正在使用 flux.flatMap(this::fetch), 那么您可以通过调用 flux.flatMap(this::fetch, 4) 来设置 flatMap 并发性。

此外,您的代码有两个编译错误:

  1. return 类型的提取 Mono<Integer> 与您提供给接收器的项目类型不匹配 (a + " result")。我假设你的意思是 Mono<String>
  2. MonoSink 没有 .next 方法。我假设你的意思是 .success

考虑到所有这些,这里有一个例子:

    private Flux<String> fetchAll() {
        return Flux.range(0, 50)
                .map(i -> Integer.toString(i))
                .flatMap(this::fetch, 4);

    }

    private Mono<String> fetch(String a) {
        return Mono.create(em ->
                scheduledExecutorService.schedule(() -> em.success(a + " result"),
                        10 + random.nextInt(50), TimeUnit.MILLISECONDS)
        );
    }