Spring 反应堆 |批处理输入而不改变
Spring Reactor | Batching the input without mutating
我正在尝试对流式源 (Kafka) 不断发出的记录进行批处理,并以 100 个为一组调用我的服务。
我得到的输入是一条记录。我正在尝试使用 Spring Reactor 以 Reactive 方式实现它的最佳方法,而无需在管道外进行突变和锁定。
这是我天真的尝试,它只是反映了我的顺序思维方式:
Mono.just(input)
.subscribe(i -> {
batches.add(input);
if(batches.size() >= 100) {
// Invoke another reactive pipeline.
// Clear the batch (requires locking in order to be thread safe).
}
});
使用 reactor 在流式源上实现批处理的最佳方法是什么。
.buffer(100)
或 bufferTimeout(100, Duration.ofSeconds(xxx)
来救援
使用Flux.buffer
或Flux.bufferTimeout
您将能够收集固定数量的元素到List
StepVerifier.create(
Flux.range(0, 1000)
.buffer(100)
)
.expectNextCount(10)
.expectComplete()
.verify()
用例更新
如果输入是单个值,假设调用带有参数的方法:
public void invokeMe(String element);
您可以采用 UnicastProcessor
技术并将所有数据传输到该处理器,然后它会处理批处理
class Batcher {
final UnicastProcessor processor = UnicastProcessor.create();
public void invokeMe(String element) {
processor.sink().next(element);
// or Mono.just(element).subscribe(processor);
}
public Flux<List<String>> listen() {
return processor.bufferTimeout(100, Duration.ofSeconds(5));
}
}
Batcher batcher = new Batcher();
StepVerifier.create(
batcher.listen()
)
.then(() -> Flux.range(0, 1000)
.subscribe(i -> batcher.invokeMe("" + i)))
.expectNextCount(10)
.thenCancel()
.verify()
从该示例中,我们可能会了解如何提供单点接收事件,然后收听批处理过程的结果。
Please note that UnicastPorcessor
allows only one subscriber, so it will be useful for the model when there is one interested party in batching results and many data producers. In a case when you have subscribers as many as producers you may want to use one of the next processors -> DirectProcessor
, TopicProcessor
, WorkerQueueProcessor
. To learn more about Reactor Processors follow the link
我正在尝试对流式源 (Kafka) 不断发出的记录进行批处理,并以 100 个为一组调用我的服务。
我得到的输入是一条记录。我正在尝试使用 Spring Reactor 以 Reactive 方式实现它的最佳方法,而无需在管道外进行突变和锁定。
这是我天真的尝试,它只是反映了我的顺序思维方式:
Mono.just(input)
.subscribe(i -> {
batches.add(input);
if(batches.size() >= 100) {
// Invoke another reactive pipeline.
// Clear the batch (requires locking in order to be thread safe).
}
});
使用 reactor 在流式源上实现批处理的最佳方法是什么。
.buffer(100)
或 bufferTimeout(100, Duration.ofSeconds(xxx)
来救援
使用Flux.buffer
或Flux.bufferTimeout
您将能够收集固定数量的元素到List
StepVerifier.create(
Flux.range(0, 1000)
.buffer(100)
)
.expectNextCount(10)
.expectComplete()
.verify()
用例更新
如果输入是单个值,假设调用带有参数的方法:
public void invokeMe(String element);
您可以采用 UnicastProcessor
技术并将所有数据传输到该处理器,然后它会处理批处理
class Batcher {
final UnicastProcessor processor = UnicastProcessor.create();
public void invokeMe(String element) {
processor.sink().next(element);
// or Mono.just(element).subscribe(processor);
}
public Flux<List<String>> listen() {
return processor.bufferTimeout(100, Duration.ofSeconds(5));
}
}
Batcher batcher = new Batcher();
StepVerifier.create(
batcher.listen()
)
.then(() -> Flux.range(0, 1000)
.subscribe(i -> batcher.invokeMe("" + i)))
.expectNextCount(10)
.thenCancel()
.verify()
从该示例中,我们可能会了解如何提供单点接收事件,然后收听批处理过程的结果。
Please note that
UnicastPorcessor
allows only one subscriber, so it will be useful for the model when there is one interested party in batching results and many data producers. In a case when you have subscribers as many as producers you may want to use one of the next processors ->DirectProcessor
,TopicProcessor
,WorkerQueueProcessor
. To learn more about Reactor Processors follow the link