如何创建反应式 'intermediate subscriber'?如何将所有 Flux<T> 部分组合起来并在链中进一步处理它们的结果(转换为 Mono<R>)?

How to create reactive 'intermediate subscriber'? How to combine all Flux<T> parts and process result of them further in chain (convert to Mono<R>)?

我正在使用 spring webflux + mongodb-reactive 将二进制文件(图像)保存到 Mongo 数据库。不幸的是 spring-boot-starter-data-mongodb-reactive:2.0.5.RELEASE 不支持 GridFsTemplate 功能的反应式编程。所以我决定创建一个 Subscriber,它将获取所有 DataBuffer 部分,将它们组合并转换为 InputStream,因此 GridFsTemplate::store 将成为可能:

public class GridFsTemplateSubscriber extends BaseSubscriber<DataBuffer> {
private GridFsTemplate gridFsTemplate;
private List<DataBuffer> dataBuffers;
private String fileName;

public GridFsTemplateSubscriber(GridFsTemplate gridFsTemplate, String fileName) {
    this.gridFsTemplate = gridFsTemplate;
    this.fileName = fileName;
    dataBuffers = new ArrayList<>();
}

public void hookOnNext(DataBuffer dataBuffer) {
    dataBuffers.add(dataBuffer);
    request(1);
}

public void hookOnComplete() {
    DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
    InputStream inputStream = defaultDataBufferFactory.join(dataBuffers).asInputStream();
    ObjectId objectId = gridFsTemplate.store(inputStream, fileName);
}
}

问题是我想 return objectId 进行进一步处理,但 hookOnComplete 是无效类型。甚至更多.. 我想从这里获得 ObjectId 的 Mono,所以我可以以反应方式进一步处理它。在那种情况下,据我了解 React 哲学,我不应该使用 'real' 订阅者,而是会结合 Flux<T> 和 onComplete 时的结果,return Mono<R>。 project-reactor 有这样的能力吗?我是反应式编程的新手,所以我可能会错过整个想法,所以请指导我如何实现。

在我之前的解决方案中,我使用 block() 来结束反应链,所以我得到了 ObjectId,接下来我用新链发出了对象 ID。但这肯定不是一个好的解决方案。

不幸的是,整个方法都是自找麻烦,不建议这样做。

实施Subscriber(甚至使用BaseSubscriber)几乎永远不是解决方案。如果您使用的驱动程序库不支持反应流,那么您 should probably wrap it as blocking code.

这样做会失去很多特性(运行时行为、背压等),但如果您的订阅者实现不完美,您至少不会冒破坏整个应用程序的风险。

在你的核心 libraries/drivers 支持反应流之前,你应该坚持使用 Spring MVC,它支持异步概念和 Flux Mono return 类型在某些情况下。