以反应方式保存使用 spring 数据 elasticsearch 的记录通量

Save flux of records usung spring data elasticsearch in reactive manner

请帮助我找到将从 Cassandra 接收的实体流量保存到 Elasticserch 的正确方法。

例如,我有大量的实体:

Flux<User> user = cassandraService.getUsers();

如何使用反应方式将所有实体流量保存到 Elasticsearch? 哪种调用Elasticksearch的方式比较好:

以及如何阻止直到最后一个 flux 中的实体被处理?

首先,flatMap()map() 的简短说明:

  • flatMap() 应该用于非阻塞操作(异步)——简而言之,任何 returns 支持 Mono,Flux,
  • map() 应用于通过对每个项目应用同步函数(在固定时间执行)来转换对象。

因此,如果您想在 Elasticsearch 中保存文档,您应该使用 flatMap() - 这是一个异步操作,其执行时间是不确定的。

当谈到doOnNext()时,它用于添加副作用行为,当 Flux 发出一个项目时触发,例如用于日志记录或保存完成后应该发生的一些其他操作。

示例:

Mono.just(productDto)
    .map(dto -> new Product(dto.getName(), dto.isAvailable()))
    .flatMap(elasticsearchOperations::save)
    .doOnNext(savedProduct -> logger.info("Saved {}", savedProduct.getProductName()))
    .subscribe();

而且您不必在 save() 上阻塞,这是 flatMap() 的责任。