Reactor groupBy:取消 GroupedFlux 后剩余的项目会怎样?

Reactor groupBy: What happens with remaining items after GroupedFlux is canceled?

我需要用高基数键对无限 Flux 进行分组。

例如:

  1. 组密钥是域 url
  2. 对一个域的调用应该严格按顺序进行(下一个调用发生在上一个完成之后)
  3. 对不同域的调用应该是并发的
  4. 具有相同键 (url) 的项目之间的时间间隔未知,但预计具有突发性。几个项目在短时间内发出然后长时间停顿直到下一组。
queue
    .groupBy(keyMapper, groupPrefetch)
    .flatMap(
       { group ->
           group.concatMap(
               { task -> makeSlowRemoteCall(task) },
               0
           )
           .takeUntil { remoteCallResult -> remoteCallResult == DONE }
           .timeout(groupTimeout, Mono.empty())
           .then()
       }
      , concurrency
    )

我分两种情况退团:

  1. makeSlowRemoteCall() 结果表明该组近期内很可能没有新项目。

  2. groupTimeout 期间未发出下一项。我使用 timeout(timeout, fallback) 变体来抑制 TimeoutException 并允许 flatMap 的内部发布者成功完成。

我希望未来可能的项目具有相同的键来创建新的 GroupedFlux 并使用相同的 flatMap 内部管道进行处理。

但是,如果在我 取消 GroupedFlux 时仍有未请求的项目,会发生什么情况?

groupBy 运算符是否将它们重新排队到具有相同密钥的新组中,否则它们将永远丢失。如果以后解决我的问题的正确方法是什么。我也不确定在这种情况下是否需要将 concatMap() prefetch 设置为 0。

我认为 groupBy() 运算符不适合我的无限源和大量组的任务。它会产生无限组,因此有必要以某种方式取消下游的空闲组。但是不可能取消 GroupedFlux 并保证它没有未消耗的元素。

我认为拥有发出有限组的 groupBy 变体会很棒。 类似于 groupBy(keyMapper, boundryPredicate)。当 boundryPredicate returns true 当前组完成时,具有相同键的下一个元素将开始新组。