如何使用 Spring WebFlux 构建反应式内存存储库?
How do you build a reactive in-memory repository with Spring WebFlux?
我正在尝试实现一个响应式内存存储库。这应该如何实现?
这是我正在尝试做的事情的阻塞版本
@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {
private final List<Event> events;
@Override
public void save(final Mono<Event> event) {
events.add(event.block());
// event.subscribe(events::add); <- does not do anything
}
@Override
public Flux<Event> findAll() {
return Flux.fromIterable(events);
}
}
我尝试使用 event.subscribe(events::add);
,但事件没有添加到列表中(也许我遗漏了什么?)
也许 events
应该是 Flux<Event>
类型并且有一些方法可以将 Mono<Event>
添加到 Flux<Event>
?
如果您参加 Flux.fromIterable
,您将只能订阅 之前的 个活动,但您将失去以后的活动
我之前做过一个PoC,想得到类似的效果,你可以去看看https://github.com/AlbertoSH/KeepMeUpdated
主要思想是有一个中心点,事件发生在这个中心点,存储库被订阅。每当您订阅 findAll
,您将获得 List<Item>
的无限流。任何保存的项目都会触发一个新事件,任何订阅 findAll
的人都会收到它
请注意,此存储库使用的是 RxJava,因此可能需要一些反应器端口
我建议为此目的使用 Sink。
public static class InMemEventRepository {
private final Scheduler serializerScheduler = Schedulers.single();
private final Sinks.Many<Event> events = Sinks.many().replay().all();
public void save(Mono<Event> event) {
event
.publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
.subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST));
}
public Flux<Event> findAll() {
return events.asFlux();
}
}
这是 reactor 3.4。对于旧版本,您可以使用处理器,但现在已弃用它们。接收器通常更易于使用,但它们不会序列化来自多个线程的发射。这就是我使用调度程序的原因。
另请参阅this answer,了解从接收器序列化发射的替代方法
我正在尝试实现一个响应式内存存储库。这应该如何实现?
这是我正在尝试做的事情的阻塞版本
@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {
private final List<Event> events;
@Override
public void save(final Mono<Event> event) {
events.add(event.block());
// event.subscribe(events::add); <- does not do anything
}
@Override
public Flux<Event> findAll() {
return Flux.fromIterable(events);
}
}
我尝试使用 event.subscribe(events::add);
,但事件没有添加到列表中(也许我遗漏了什么?)
也许 events
应该是 Flux<Event>
类型并且有一些方法可以将 Mono<Event>
添加到 Flux<Event>
?
如果您参加 Flux.fromIterable
,您将只能订阅 之前的 个活动,但您将失去以后的活动
我之前做过一个PoC,想得到类似的效果,你可以去看看https://github.com/AlbertoSH/KeepMeUpdated
主要思想是有一个中心点,事件发生在这个中心点,存储库被订阅。每当您订阅 findAll
,您将获得 List<Item>
的无限流。任何保存的项目都会触发一个新事件,任何订阅 findAll
的人都会收到它
请注意,此存储库使用的是 RxJava,因此可能需要一些反应器端口
我建议为此目的使用 Sink。
public static class InMemEventRepository {
private final Scheduler serializerScheduler = Schedulers.single();
private final Sinks.Many<Event> events = Sinks.many().replay().all();
public void save(Mono<Event> event) {
event
.publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
.subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST));
}
public Flux<Event> findAll() {
return events.asFlux();
}
}
这是 reactor 3.4。对于旧版本,您可以使用处理器,但现在已弃用它们。接收器通常更易于使用,但它们不会序列化来自多个线程的发射。这就是我使用调度程序的原因。
另请参阅this answer,了解从接收器序列化发射的替代方法