我如何在 Reactor 3 中跨多个发布者排队工作?
How do I queue work across multiple publishers in Reactor 3?
我正在创建一个库,用于使用 Reactor 3 创建数据处理工作流。每个任务都有一个输入通量和一个输出通量。输入通量由用户提供。输出通量由库创建。任务可以链接起来形成 DAG。像这样:(在 Kotlin 中)
val base64 = task<String, String>("base64") {
input { Flux.just("a", "b", "c", "d", "e") }
outputFn { ... get the output values ... }
scriptFn { ... do some stuff ... }
}
val step2 = task<List<String>, String>("step2") {
input { base64.output.buffer(3) }
outputFn { ... }
scriptFn { ... }
}
我有限制整个工作流程并发的需求。一次只能处理配置数量的输入。在上面的示例中,对于 3 的限制,这意味着任务 base64 将 运行 首先输入 "a"、"b" 和 "c",然后在处理之前等待每个完成"d"、"e" 和 "step2" 任务。
从输入通量创建输出通量时如何应用此类限制?可以以某种方式应用 TopicProcessor 吗?也许某种自定义调度程序或处理器?背压如何工作?我需要担心创建缓冲区吗?
背压从最终的订阅者向上传播,遍及整个链。但是链中的操作者可以提前(预取)甚至 "rewrite" 请求数据。例如,在 buffer(3)
的情况下,如果该运算符收到 request(1)
它将执行 request(3)
上游(“1 个缓冲区 == 最多 3 个元素,以便我可以请求我的源足以填充我被请求的 1 个缓冲区").
如果输入总是由用户提供,这将很难抽象掉...
没有简单的方法可以跨多个管道甚至对给定管道的多个订阅(Flux
)对限制源进行评级。
在多个 publishOn
中使用共享 Scheduler
将不起作用,因为 publishOn
选择了一个 Worker
线程并坚持下去。
但是,如果您的问题更具体地是关于 base64
任务被限制,也许可以从 flatMap
的并发参数中获得效果?
input.flatMap(someString -> asyncProcess(someString), 3, 1);
这将使 asyncProcess
运行 最多出现 3 次,每次终止时,它都会从 input
的下一个值开始一个新值。
我正在创建一个库,用于使用 Reactor 3 创建数据处理工作流。每个任务都有一个输入通量和一个输出通量。输入通量由用户提供。输出通量由库创建。任务可以链接起来形成 DAG。像这样:(在 Kotlin 中)
val base64 = task<String, String>("base64") {
input { Flux.just("a", "b", "c", "d", "e") }
outputFn { ... get the output values ... }
scriptFn { ... do some stuff ... }
}
val step2 = task<List<String>, String>("step2") {
input { base64.output.buffer(3) }
outputFn { ... }
scriptFn { ... }
}
我有限制整个工作流程并发的需求。一次只能处理配置数量的输入。在上面的示例中,对于 3 的限制,这意味着任务 base64 将 运行 首先输入 "a"、"b" 和 "c",然后在处理之前等待每个完成"d"、"e" 和 "step2" 任务。
从输入通量创建输出通量时如何应用此类限制?可以以某种方式应用 TopicProcessor 吗?也许某种自定义调度程序或处理器?背压如何工作?我需要担心创建缓冲区吗?
背压从最终的订阅者向上传播,遍及整个链。但是链中的操作者可以提前(预取)甚至 "rewrite" 请求数据。例如,在 buffer(3)
的情况下,如果该运算符收到 request(1)
它将执行 request(3)
上游(“1 个缓冲区 == 最多 3 个元素,以便我可以请求我的源足以填充我被请求的 1 个缓冲区").
如果输入总是由用户提供,这将很难抽象掉...
没有简单的方法可以跨多个管道甚至对给定管道的多个订阅(Flux
)对限制源进行评级。
在多个 publishOn
中使用共享 Scheduler
将不起作用,因为 publishOn
选择了一个 Worker
线程并坚持下去。
但是,如果您的问题更具体地是关于 base64
任务被限制,也许可以从 flatMap
的并发参数中获得效果?
input.flatMap(someString -> asyncProcess(someString), 3, 1);
这将使 asyncProcess
运行 最多出现 3 次,每次终止时,它都会从 input
的下一个值开始一个新值。