我如何在 Reactor 3 中跨多个发布者排队工作?

How do I queue work across multiple publishers in Reactor 3?

我正在创建一个库,用于使用 Reactor 3 创建数据处理工作流。每个任务都有一个输入通量和一个输出通量。输入通量由用户提供。输出通量由库创建。任务可以链接起来形成 DAG。像这样:(在 Kotlin 中)

val base64 = task<String, String>("base64") {
    input { Flux.just("a", "b", "c", "d", "e") }
    outputFn { ... get the output values ... }
    scriptFn { ... do some stuff ... }
}

val step2 = task<List<String>, String>("step2") {
    input { base64.output.buffer(3) }
    outputFn { ... }
    scriptFn { ... }
}

我有限制整个工作流程并发的需求。一次只能处理配置数量的输入。在上面的示例中,对于 3 的限制,这意味着任务 base64 将 运行 首先输入 "a"、"b" 和 "c",然后在处理之前等待每个完成"d"、"e" 和 "step2" 任务。

从输入通量创建输出通量时如何应用此类限制?可以以某种方式应用 TopicProcessor 吗?也许某种自定义调度程序或处理器?背压如何工作?我需要担心创建缓冲区吗?

背压从最终的订阅者向上传播,遍及整个链。但是链中的操作者可以提前(预取)甚至 "rewrite" 请求数据。例如,在 buffer(3) 的情况下,如果该运算符收到 request(1) 它将执行 request(3) 上游(“1 个缓冲区 == 最多 3 个元素,以便我可以请求我的源足以填充我被请求的 1 个缓冲区").

如果输入总是由用户提供,这将很难抽象掉...

没有简单的方法可以跨多个管道甚至对给定管道的多个订阅(Flux)对限制源进行评级。

在多个 publishOn 中使用共享 Scheduler 将不起作用,因为 publishOn 选择了一个 Worker 线程并坚持下去。

但是,如果您的问题更具体地是关于 base64 任务被限制,也许可以从 flatMap 的并发参数中获得效果?

input.flatMap(someString -> asyncProcess(someString), 3, 1);

这将使 asyncProcess 运行 最多出现 3 次,每次终止时,它都会从 input 的下一个值开始一个新值。