Reactor groupBy:取消 GroupedFlux 后剩余的项目会怎样?
Reactor groupBy: What happens with remaining items after GroupedFlux is canceled?
我需要用高基数键对无限 Flux 进行分组。
例如:
- 组密钥是域 url
- 对一个域的调用应该严格按顺序进行(下一个调用发生在上一个完成之后)
- 对不同域的调用应该是并发的
- 具有相同键 (url) 的项目之间的时间间隔未知,但预计具有突发性。几个项目在短时间内发出然后长时间停顿直到下一组。
queue
.groupBy(keyMapper, groupPrefetch)
.flatMap(
{ group ->
group.concatMap(
{ task -> makeSlowRemoteCall(task) },
0
)
.takeUntil { remoteCallResult -> remoteCallResult == DONE }
.timeout(groupTimeout, Mono.empty())
.then()
}
, concurrency
)
我分两种情况退团:
makeSlowRemoteCall()
结果表明该组近期内很可能没有新项目。
在 groupTimeout
期间未发出下一项。我使用 timeout(timeout, fallback)
变体来抑制 TimeoutException 并允许 flatMap 的内部发布者成功完成。
我希望未来可能的项目具有相同的键来创建新的 GroupedFlux 并使用相同的 flatMap 内部管道进行处理。
但是,如果在我 取消 GroupedFlux 时仍有未请求的项目,会发生什么情况?
groupBy 运算符是否将它们重新排队到具有相同密钥的新组中,否则它们将永远丢失。如果以后解决我的问题的正确方法是什么。我也不确定在这种情况下是否需要将 concatMap() prefetch 设置为 0。
我认为 groupBy()
运算符不适合我的无限源和大量组的任务。它会产生无限组,因此有必要以某种方式取消下游的空闲组。但是不可能取消 GroupedFlux 并保证它没有未消耗的元素。
我认为拥有发出有限组的 groupBy 变体会很棒。
类似于 groupBy(keyMapper, boundryPredicate)
。当 boundryPredicate returns true 当前组完成时,具有相同键的下一个元素将开始新组。
我需要用高基数键对无限 Flux 进行分组。
例如:
- 组密钥是域 url
- 对一个域的调用应该严格按顺序进行(下一个调用发生在上一个完成之后)
- 对不同域的调用应该是并发的
- 具有相同键 (url) 的项目之间的时间间隔未知,但预计具有突发性。几个项目在短时间内发出然后长时间停顿直到下一组。
queue
.groupBy(keyMapper, groupPrefetch)
.flatMap(
{ group ->
group.concatMap(
{ task -> makeSlowRemoteCall(task) },
0
)
.takeUntil { remoteCallResult -> remoteCallResult == DONE }
.timeout(groupTimeout, Mono.empty())
.then()
}
, concurrency
)
我分两种情况退团:
makeSlowRemoteCall()
结果表明该组近期内很可能没有新项目。在
groupTimeout
期间未发出下一项。我使用timeout(timeout, fallback)
变体来抑制 TimeoutException 并允许 flatMap 的内部发布者成功完成。
我希望未来可能的项目具有相同的键来创建新的 GroupedFlux 并使用相同的 flatMap 内部管道进行处理。
但是,如果在我 取消 GroupedFlux 时仍有未请求的项目,会发生什么情况?
groupBy 运算符是否将它们重新排队到具有相同密钥的新组中,否则它们将永远丢失。如果以后解决我的问题的正确方法是什么。我也不确定在这种情况下是否需要将 concatMap() prefetch 设置为 0。
我认为 groupBy()
运算符不适合我的无限源和大量组的任务。它会产生无限组,因此有必要以某种方式取消下游的空闲组。但是不可能取消 GroupedFlux 并保证它没有未消耗的元素。
我认为拥有发出有限组的 groupBy 变体会很棒。
类似于 groupBy(keyMapper, boundryPredicate)
。当 boundryPredicate returns true 当前组完成时,具有相同键的下一个元素将开始新组。