Spring 反应堆 |批处理输入而不改变

Spring Reactor | Batching the input without mutating

我正在尝试对流式源 (Kafka) 不断发出的记录进行批处理,并以 100 个为一组调用我的服务。

我得到的输入是一条记录。我正在尝试使用 Spring Reactor 以 Reactive 方式实现它的最佳方法,而无需在管道外进行突变和锁定。

这是我天真的尝试,它只是反映了我的顺序思维方式:

 Mono.just(input)
                .subscribe(i -> {
                     batches.add(input);
                     if(batches.size() >= 100) {
                         // Invoke another reactive pipeline.
                         // Clear the batch (requires locking in order to be thread safe).
                     }
                });

使用 reactor 在流式源上实现批处理的最佳方法是什么。

.buffer(100)bufferTimeout(100, Duration.ofSeconds(xxx) 来救援

使用Flux.bufferFlux.bufferTimeout您将能够收集固定数量的元素到List

StepVerifier.create(
      Flux.range(0, 1000)
          .buffer(100)
   )
   .expectNextCount(10)
   .expectComplete()
   .verify()

用例更新

如果输入是单个值,假设调用带有参数的方法:

public void invokeMe(String element);

您可以采用 UnicastProcessor 技术并将所有数据传输到该处理器,然后它会处理批处理

class Batcher {

   final UnicastProcessor processor = UnicastProcessor.create();

   public void invokeMe(String element) {
       processor.sink().next(element);
       // or Mono.just(element).subscribe(processor);
   }


   public Flux<List<String>> listen() {
       return processor.bufferTimeout(100, Duration.ofSeconds(5));
   }
}

Batcher batcher = new Batcher();

StepVerifier.create(
      batcher.listen()
   )
   .then(() -> Flux.range(0, 1000)
                   .subscribe(i -> batcher.invokeMe("" + i)))
   .expectNextCount(10)
   .thenCancel()
   .verify()

从该示例中,我们可能会了解如何提供单点接收事件,然后收听批处理过程的结果。

Please note that UnicastPorcessor allows only one subscriber, so it will be useful for the model when there is one interested party in batching results and many data producers. In a case when you have subscribers as many as producers you may want to use one of the next processors -> DirectProcessor, TopicProcessor, WorkerQueueProcessor. To learn more about Reactor Processors follow the link