使用 Spring 数据和 Reactive MongoDB 推送 Flux
Push a Flux using Spring Data with Reactive MongoDB
我正在使用 Spring Boot 2.0.2,我想使用 [=38] 将 Flux
的内容插入到 MongoDB 中文档的数组中=] 使用 ReactiveMongoOperations 对其反应性 mongodb 支持的数据。例如(Kotlin 代码):
val mongo : ReactiveMongoOperations = ...
data class Something(val data: String)
val flux = Flux.just(Something("A"), Something("B"))
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(flux),
"collection")
(请注意,这只是一个示例,在实际代码中不会使用 Flux.just(..)
生成 flux
)。
虽然这有效,但没有产生我想要的结果:
{ "_id" : "myId", "myArray" : [ { "array" : [ { "data" : "A" }, { "data" : "B" } ], "_class" : "reactor.core.publisher.FluxArray" } ] }
我希望结果是:
{ "_id" : "myId", "myArray" : [ { "data" : "A" }, { "data" : "B" } ] }
即Spring 数据序列化 Flux
中的内容(元素),而不是 Flux
本身。这个例子产生了正确的结果:
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(Something("A"), Something("B")),
"collection")
我怎样才能做到这一点而不阻塞?
TL;博士
您需要预先收集物品并$push
收集物品。
说明
您不能将响应式包装器类型用作 MongoDB 查询的值,因为它们需要预先解析为值。您可以通过应用 collectList()
和 flatMap()
运算符来实现您想要的:
val flux: Flux<Something>
flux.collectList()
.flatMap { mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(it),
"collection") }
如果元素的数量超出了合理的限制,那么 buffer(n)
可能是更合适的选择。
我正在使用 Spring Boot 2.0.2,我想使用 [=38] 将 Flux
的内容插入到 MongoDB 中文档的数组中=] 使用 ReactiveMongoOperations 对其反应性 mongodb 支持的数据。例如(Kotlin 代码):
val mongo : ReactiveMongoOperations = ...
data class Something(val data: String)
val flux = Flux.just(Something("A"), Something("B"))
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(flux),
"collection")
(请注意,这只是一个示例,在实际代码中不会使用 Flux.just(..)
生成 flux
)。
虽然这有效,但没有产生我想要的结果:
{ "_id" : "myId", "myArray" : [ { "array" : [ { "data" : "A" }, { "data" : "B" } ], "_class" : "reactor.core.publisher.FluxArray" } ] }
我希望结果是:
{ "_id" : "myId", "myArray" : [ { "data" : "A" }, { "data" : "B" } ] }
即Spring 数据序列化 Flux
中的内容(元素),而不是 Flux
本身。这个例子产生了正确的结果:
mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(Something("A"), Something("B")),
"collection")
我怎样才能做到这一点而不阻塞?
TL;博士
您需要预先收集物品并$push
收集物品。
说明
您不能将响应式包装器类型用作 MongoDB 查询的值,因为它们需要预先解析为值。您可以通过应用 collectList()
和 flatMap()
运算符来实现您想要的:
val flux: Flux<Something>
flux.collectList()
.flatMap { mongo.upsert(query(where("_id").isEqualTo("myId")),
Update().push("myArray").each(it),
"collection") }
如果元素的数量超出了合理的限制,那么 buffer(n)
可能是更合适的选择。