使用 Spring 数据和 Reactive MongoDB 推送 Flux

Push a Flux using Spring Data with Reactive MongoDB

我正在使用 Spring Boot 2.0.2,我想使用 [=38] 将 Flux 的内容插入到 MongoDB 中文档的数组中=] 使用 ReactiveMongoOperations 对其反应性 mongodb 支持的数据。例如(Kotlin 代码):

val mongo : ReactiveMongoOperations = ...
data class Something(val data: String)
val flux = Flux.just(Something("A"), Something("B"))

mongo.upsert(query(where("_id").isEqualTo("myId")),
                        Update().push("myArray").each(flux),
                        "collection")

(请注意,这只是一个示例,在实际代码中不会使用 Flux.just(..) 生成 flux)。

虽然这有效,但没有产生我想要的结果:

{ "_id" : "myId", "myArray" : [ { "array" : [ { "data" : "A" }, { "data" : "B" } ], "_class" : "reactor.core.publisher.FluxArray" } ] }

我希望结果是:

{ "_id" : "myId", "myArray" : [ { "data" : "A" }, { "data" : "B" } ] }

即Spring 数据序列化 Flux 中的内容(元素),而不是 Flux 本身。这个例子产生了正确的结果:

mongo.upsert(query(where("_id").isEqualTo("myId")),
                        Update().push("myArray").each(Something("A"), Something("B")),
                        "collection")

我怎样才能做到这一点而不阻塞

TL;博士

您需要预先收集物品并$push收集物品。

说明

您不能将响应式包装器类型用作 MongoDB 查询的值,因为它们需要预先解析为值。您可以通过应用 collectList()flatMap() 运算符来实现您想要的:

val flux: Flux<Something>

flux.collectList()
    .flatMap { mongo.upsert(query(where("_id").isEqualTo("myId")),
                    Update().push("myArray").each(it),
                    "collection")  }

如果元素的数量超出了合理的限制,那么 buffer(n) 可能是更合适的选择。