在 Webflux 的 Mono 上用 reactive MongoDB 执行一个操作 n 次
Execute an operation n times on a Mono in Webflux with reactive MongoDB
我在使用反应式 MongoDB 驱动程序和 Spring 的 Webflux 应用程序中有以下数据加载器场景:
- 创建 X 个类型 B 的对象
- 创建Y个类型A的对象:对象A包含一个数组类型的字段和一个对类型B对象的引用。对B的引用是从第一步随机选择的
- 将N个条目添加到先前创建的对象的数组中
我面临的问题似乎是 Mono/Flux 的并行执行,根据我的理解,这不应该发生。根据文档,除非另有说明,否则事情总是按顺序执行。
有人可以提示我做错了什么吗?
这是一个示例代码片段。对象 A 是厕所。对象 B 是用户。数组字段为评论字段:
Flux.range(0, 10)
// create 10 objects of type user
.flatMap {
LOG.debug("Creating user $it")
userRepository.save(
User(
id = ObjectId(),
name = userNames.random(),
email = "${userNames.random()}@mail.com"
)
)
}
.collectList()
// create 2 objects of type toilet
.flatMapMany { userList ->
Flux.range(0, 2).zipWith(Flux.range(0, 2).map { userList })
}
.flatMap {
LOG.debug("Creating toilet ${it.t1}")
val userList = it.t2
toiletRepository.save(
Toilet(
id = ObjectId(),
title = userList.random().name
)
)
}
// add 5 entries to array of toilet
.flatMap { toilet ->
Flux.range(0, 5).zipWith(Flux.range(0, 5).map { toilet })
}
.flatMap { tuple ->
val toilet = tuple.t2
LOG.debug("Creating comment ${tuple.t1} for toilet $toilet")
// get current values from toilet
toiletRepository.findById(toilet.id).map {
// and push a new element to the comments array
LOG.debug("Comment size ${it.commentRefs.size}")
toiletRepository.save(it.apply { commentRefs.add(ObjectId()) })
}
}
.subscribe {
GlobalScope.launch {
exitProcess(SpringApplication.exit(context))
}
}
执行此代码会产生以下日志:
2020-11-15 19:42:54.197 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 0
2020-11-15 19:42:54.293 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 1
2020-11-15 19:42:54.295 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 2
2020-11-15 19:42:54.296 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 3
2020-11-15 19:42:54.300 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 4
2020-11-15 19:42:54.301 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 5
2020-11-15 19:42:54.304 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 6
2020-11-15 19:42:54.310 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 7
2020-11-15 19:42:54.316 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 8
2020-11-15 19:42:54.318 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 9
2020-11-15 19:42:54.348 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Creating toilet 0
2020-11-15 19:42:54.380 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Creating toilet 1
2020-11-15 19:42:54.386 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.405 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.406 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.407 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.409 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.410 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.412 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.413 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.414 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.415 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.425 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-6] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-3] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-7] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.464 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Comment size 0
我现在有三个问题:
- 为什么Thread从main切换到LoopGroup?如果它按顺序执行,它根本不应该使用多线程吗?
- 为什么
Comment size
日志消息在末尾组合在一起?
- 如何使用响应式 mongo 存储库实现将元素正确推送到数组?
如有任何提示,我们将不胜感激。我假设 findById
和 save
的嵌套执行是不正确的,但是您将如何以不同的方式编写它?由于 save
需要一个实体,因此我需要传入最新版本的实体,该实体在数组中包含一个附加元素。
我尝试通过使用 findById
获取最新版本并使用 'map -> save' 直接修改它来实现这一点。
谢谢大家!
我不确定这是否是最好的方法,但我能够通过拆分函数中的操作以使其更合理地分组来实现我想要的。
以下是以下操作的片段:
- 创建用户
- 创建评论
- 创建评分
private fun createUsers() = Flux.range(0, userNames.size + 1)
.flatMap {
if (it < userNames.size) {
LOG.debug("Creating user $it")
userRepository.save(
User(
id = ObjectId(),
name = userNames[it],
email = "${userNames[it]}@mail.com"
)
)
} else {
LOG.debug("Creating dev-user")
userRepository.save(
User(
id = ObjectId("000000000000012343456789"),
name = "devuser",
email = "devuser@mail.com"
)
)
}
}
.collectList()
private fun createComments(users: List<User>) = Flux.range(0, numComments)
.flatMap {
LOG.debug("Creating comment $it")
commentRepository.save(
Comment(
id = ObjectId(),
text = commentTexts.random(),
userRef = users.random().id
)
)
}
.collectList()
private fun createRatings(users: List<User>) = Flux.range(0, numRatings)
.flatMap {
LOG.debug("Creating rating $it")
ratingRepository.save(
Rating(
id = ObjectId(),
userRef = users.random().id,
value = Random.nextInt(0, 5)
)
)
}
.collectList()
最后用上面的结果创建厕所:
private fun createToilets(comments: List<Comment>, ratings: List<Rating>) = Flux.range(0, numToilets)
.flatMap {
val toilet = Toilet(
id = ObjectId(),
title = titles.random(),
location = GeoJsonPoint(Random.nextDouble(10.0, 20.0), Random.nextDouble(45.0, 55.0)),
description = descriptions.random()
)
// add comments
val commentsToAdd = Random.nextInt(0, comments.size)
for (i in 0 until commentsToAdd) {
toilet.commentRefs.add(comments[i].id)
}
// add average rating and rating references
val ratingsToAdd = Random.nextInt(0, ratings.size)
for (i in 0 until ratingsToAdd) {
toilet.ratingRefs.add(ratings[i].id)
toilet.averageRating += ratings[i].value
}
if (toilet.ratingRefs.isNotEmpty()) {
toilet.averageRating /= toilet.ratingRefs.size
}
LOG.debug("Creating toilet $it with $commentsToAdd comments and $ratingsToAdd ratings")
toiletRepository.save(toilet)
}
// upload preview image
.flatMap { toilet ->
val imageName = "toilet${Random.nextInt(1, 10)}.jpg"
imageService.store(
Callable {
DataLoaderRunner::class.java.getResourceAsStream("/sample-images/$imageName")
},
"${toilet.title}-preview"
).zipWith(Mono.just(toilet))
}
// set preview image
.flatMap {
val imageId = it.t1
val toilet = it.t2
toiletRepository.save(toilet.copy(previewID = imageId))
}
.collectList()
这是最终的反应式操作链:
createUsers()
.flatMap { users ->
createComments(users).map { comments ->
Tuples.of(users, comments)
}
}
.flatMap {
val users = it.t1
val comments = it.t2
createRatings(users).map { ratings ->
Tuples.of(comments, ratings)
}
}
.flatMap {
val comments = it.t1
val ratings = it.t2
createToilets(comments, ratings)
}
// close application when all toilets are processed
.subscribe {
GlobalScope.launch {
exitProcess(SpringApplication.exit(context))
}
}
我不确定这是否是最好的方法,但它确实有效。开头 post 中的方法使用嵌套 map/flatmap 操作,无论如何都应该避免,也许它们是它不起作用的原因。
我在使用反应式 MongoDB 驱动程序和 Spring 的 Webflux 应用程序中有以下数据加载器场景:
- 创建 X 个类型 B 的对象
- 创建Y个类型A的对象:对象A包含一个数组类型的字段和一个对类型B对象的引用。对B的引用是从第一步随机选择的
- 将N个条目添加到先前创建的对象的数组中
我面临的问题似乎是 Mono/Flux 的并行执行,根据我的理解,这不应该发生。根据文档,除非另有说明,否则事情总是按顺序执行。
有人可以提示我做错了什么吗?
这是一个示例代码片段。对象 A 是厕所。对象 B 是用户。数组字段为评论字段:
Flux.range(0, 10)
// create 10 objects of type user
.flatMap {
LOG.debug("Creating user $it")
userRepository.save(
User(
id = ObjectId(),
name = userNames.random(),
email = "${userNames.random()}@mail.com"
)
)
}
.collectList()
// create 2 objects of type toilet
.flatMapMany { userList ->
Flux.range(0, 2).zipWith(Flux.range(0, 2).map { userList })
}
.flatMap {
LOG.debug("Creating toilet ${it.t1}")
val userList = it.t2
toiletRepository.save(
Toilet(
id = ObjectId(),
title = userList.random().name
)
)
}
// add 5 entries to array of toilet
.flatMap { toilet ->
Flux.range(0, 5).zipWith(Flux.range(0, 5).map { toilet })
}
.flatMap { tuple ->
val toilet = tuple.t2
LOG.debug("Creating comment ${tuple.t1} for toilet $toilet")
// get current values from toilet
toiletRepository.findById(toilet.id).map {
// and push a new element to the comments array
LOG.debug("Comment size ${it.commentRefs.size}")
toiletRepository.save(it.apply { commentRefs.add(ObjectId()) })
}
}
.subscribe {
GlobalScope.launch {
exitProcess(SpringApplication.exit(context))
}
}
执行此代码会产生以下日志:
2020-11-15 19:42:54.197 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 0
2020-11-15 19:42:54.293 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 1
2020-11-15 19:42:54.295 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 2
2020-11-15 19:42:54.296 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 3
2020-11-15 19:42:54.300 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 4
2020-11-15 19:42:54.301 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 5
2020-11-15 19:42:54.304 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 6
2020-11-15 19:42:54.310 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 7
2020-11-15 19:42:54.316 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 8
2020-11-15 19:42:54.318 DEBUG 13524 --- [ main] c.g.q.t.DataLoaderRunner : Creating user 9
2020-11-15 19:42:54.348 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Creating toilet 0
2020-11-15 19:42:54.380 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Creating toilet 1
2020-11-15 19:42:54.386 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.405 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.406 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.407 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.409 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.410 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.412 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.413 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.414 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.415 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.425 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-6] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-3] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-7] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner : Comment size 0
2020-11-15 19:42:54.464 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner : Comment size 0
我现在有三个问题:
- 为什么Thread从main切换到LoopGroup?如果它按顺序执行,它根本不应该使用多线程吗?
- 为什么
Comment size
日志消息在末尾组合在一起? - 如何使用响应式 mongo 存储库实现将元素正确推送到数组?
如有任何提示,我们将不胜感激。我假设 findById
和 save
的嵌套执行是不正确的,但是您将如何以不同的方式编写它?由于 save
需要一个实体,因此我需要传入最新版本的实体,该实体在数组中包含一个附加元素。
我尝试通过使用 findById
获取最新版本并使用 'map -> save' 直接修改它来实现这一点。
谢谢大家!
我不确定这是否是最好的方法,但我能够通过拆分函数中的操作以使其更合理地分组来实现我想要的。
以下是以下操作的片段:
- 创建用户
- 创建评论
- 创建评分
private fun createUsers() = Flux.range(0, userNames.size + 1)
.flatMap {
if (it < userNames.size) {
LOG.debug("Creating user $it")
userRepository.save(
User(
id = ObjectId(),
name = userNames[it],
email = "${userNames[it]}@mail.com"
)
)
} else {
LOG.debug("Creating dev-user")
userRepository.save(
User(
id = ObjectId("000000000000012343456789"),
name = "devuser",
email = "devuser@mail.com"
)
)
}
}
.collectList()
private fun createComments(users: List<User>) = Flux.range(0, numComments)
.flatMap {
LOG.debug("Creating comment $it")
commentRepository.save(
Comment(
id = ObjectId(),
text = commentTexts.random(),
userRef = users.random().id
)
)
}
.collectList()
private fun createRatings(users: List<User>) = Flux.range(0, numRatings)
.flatMap {
LOG.debug("Creating rating $it")
ratingRepository.save(
Rating(
id = ObjectId(),
userRef = users.random().id,
value = Random.nextInt(0, 5)
)
)
}
.collectList()
最后用上面的结果创建厕所:
private fun createToilets(comments: List<Comment>, ratings: List<Rating>) = Flux.range(0, numToilets)
.flatMap {
val toilet = Toilet(
id = ObjectId(),
title = titles.random(),
location = GeoJsonPoint(Random.nextDouble(10.0, 20.0), Random.nextDouble(45.0, 55.0)),
description = descriptions.random()
)
// add comments
val commentsToAdd = Random.nextInt(0, comments.size)
for (i in 0 until commentsToAdd) {
toilet.commentRefs.add(comments[i].id)
}
// add average rating and rating references
val ratingsToAdd = Random.nextInt(0, ratings.size)
for (i in 0 until ratingsToAdd) {
toilet.ratingRefs.add(ratings[i].id)
toilet.averageRating += ratings[i].value
}
if (toilet.ratingRefs.isNotEmpty()) {
toilet.averageRating /= toilet.ratingRefs.size
}
LOG.debug("Creating toilet $it with $commentsToAdd comments and $ratingsToAdd ratings")
toiletRepository.save(toilet)
}
// upload preview image
.flatMap { toilet ->
val imageName = "toilet${Random.nextInt(1, 10)}.jpg"
imageService.store(
Callable {
DataLoaderRunner::class.java.getResourceAsStream("/sample-images/$imageName")
},
"${toilet.title}-preview"
).zipWith(Mono.just(toilet))
}
// set preview image
.flatMap {
val imageId = it.t1
val toilet = it.t2
toiletRepository.save(toilet.copy(previewID = imageId))
}
.collectList()
这是最终的反应式操作链:
createUsers()
.flatMap { users ->
createComments(users).map { comments ->
Tuples.of(users, comments)
}
}
.flatMap {
val users = it.t1
val comments = it.t2
createRatings(users).map { ratings ->
Tuples.of(comments, ratings)
}
}
.flatMap {
val comments = it.t1
val ratings = it.t2
createToilets(comments, ratings)
}
// close application when all toilets are processed
.subscribe {
GlobalScope.launch {
exitProcess(SpringApplication.exit(context))
}
}
我不确定这是否是最好的方法,但它确实有效。开头 post 中的方法使用嵌套 map/flatmap 操作,无论如何都应该避免,也许它们是它不起作用的原因。