如何使用 Spring WebFlux 构建反应式内存存储库?

How do you build a reactive in-memory repository with Spring WebFlux?

我正在尝试实现一个响应式内存存储库。这应该如何实现?

这是我正在尝试做的事情的阻塞版本

@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {

    private final List<Event> events;

    @Override
    public void save(final Mono<Event> event) {
        events.add(event.block());
        // event.subscribe(events::add); <- does not do anything
    }

    @Override
    public Flux<Event> findAll() {
        return Flux.fromIterable(events);
    }

}

我尝试使用 event.subscribe(events::add);,但事件没有添加到列表中(也许我遗漏了什么?)

也许 events 应该是 Flux<Event> 类型并且有一些方法可以将 Mono<Event> 添加到 Flux<Event>?

如果您参加 Flux.fromIterable,您将只能订阅 之前的 个活动,但您将失去以后的活动

我之前做过一个PoC,想得到类似的效果,你可以去看看https://github.com/AlbertoSH/KeepMeUpdated

主要思想是有一个中心点,事件发生在这个中心点,存储库被订阅。每当您订阅 findAll,您将获得 List<Item> 的无限流。任何保存的项目都会触发一个新事件,任何订阅 findAll 的人都会收到它

请注意,此存储库使用的是 RxJava,因此可能需要一些反应器端口

我建议为此目的使用 Sink

  public static class InMemEventRepository {
    private final Scheduler serializerScheduler = Schedulers.single();

    private final Sinks.Many<Event> events = Sinks.many().replay().all();

    public void save(Mono<Event> event) {
      event
          .publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
          .subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST)); 
    }

    public Flux<Event> findAll() {
      return events.asFlux();
    }
  }

这是 reactor 3.4。对于旧版本,您可以使用处理器,但现在已弃用它们。接收器通常更易于使用,但它们不会序列化来自多个线程的发射。这就是我使用调度程序的原因。

另请参阅this answer,了解从接收器序列化发射的替代方法