Mongo Flux/Mono 得到 object/s
Mongo Flux/Mono get object/s
我想从 Flux/Mono 得到 object/objects。
我用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
我就是这样做的:
Mono<UserEntity> byEmail = userRepository.findByEmail(userDto.getEmail());
UserEntity block = byEmail.block();
我有错误:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
为什么?有什么不同的方法可以得到 object/objects?
反应式编程如何做一些事情:
在 RequestBody 中你有 UserDto.
如果不创建用户,请检查数据库中是否存在电子邮件。
来自@simon-baslé 的更新回答
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;
你似乎找到了答案,但让我详细说明一下。
为什么你不能屏蔽
您收到的错误消息表明您试图在不适应阻塞的特定 Thread
(或线程池)内恢复到阻塞行为。这是 Spring Webflux(以及 Netty 的幕后)用来处理 应用程序中每个传入请求的线程 。因此,如果您阻止它,您的应用程序将完全无法处理新请求。
您的回答,以及一些小改进
首先,map
可以简化,因为 null
值不允许出现在 Flux
或 Mono
中。如果值不在数据库中,Spring 数据 ReactiveCrudRepository
将 return 为空 Mono
,不要与发出 [=15= 的 Mono
混淆]:
---( null )---|->
onNext(null), onComplete() // onNext(null) being forbidden
对
---|->
onComplete()
此外,我认为 onErrorResume
您打算包装数据库错误,但 "user exists" 情况除外?如果是这样,这个 onErrorResume
的位置是错误的,因为它类似于 catch(Throwable e)
,它也会捕获 UserByEmailExistException
。将它放在 之前 map
。也可以直接从地图里面扔。
所以这归结为检测一个空的 Mono
与有价值的 Mono
,用异步数据库保存替换空的,用 onError
:
TL;DR 给我代码
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;
我想从 Flux/Mono 得到 object/objects。 我用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
我就是这样做的:
Mono<UserEntity> byEmail = userRepository.findByEmail(userDto.getEmail());
UserEntity block = byEmail.block();
我有错误:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
为什么?有什么不同的方法可以得到 object/objects?
反应式编程如何做一些事情: 在 RequestBody 中你有 UserDto.
如果不创建用户,请检查数据库中是否存在电子邮件。
来自@simon-baslé 的更新回答
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;
你似乎找到了答案,但让我详细说明一下。
为什么你不能屏蔽
您收到的错误消息表明您试图在不适应阻塞的特定 Thread
(或线程池)内恢复到阻塞行为。这是 Spring Webflux(以及 Netty 的幕后)用来处理 应用程序中每个传入请求的线程 。因此,如果您阻止它,您的应用程序将完全无法处理新请求。
您的回答,以及一些小改进
首先,map
可以简化,因为 null
值不允许出现在 Flux
或 Mono
中。如果值不在数据库中,Spring 数据 ReactiveCrudRepository
将 return 为空 Mono
,不要与发出 [=15= 的 Mono
混淆]:
---( null )---|->
onNext(null), onComplete() // onNext(null) being forbidden
对
---|->
onComplete()
此外,我认为 onErrorResume
您打算包装数据库错误,但 "user exists" 情况除外?如果是这样,这个 onErrorResume
的位置是错误的,因为它类似于 catch(Throwable e)
,它也会捕获 UserByEmailExistException
。将它放在 之前 map
。也可以直接从地图里面扔。
所以这归结为检测一个空的 Mono
与有价值的 Mono
,用异步数据库保存替换空的,用 onError
:
TL;DR 给我代码
return userRepository.findByEmail(userDto.getEmail())
//rethrow DB errors as BadRequestException, except user exists
.onErrorResume(t -> Mono.error(new BadRequestException()))
//throwing in map is converted to onError signal
.map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
//switchIfEmpty to replace an empty source with a different Mono
.switchIfEmpty(createUser(userDto))
//at this point, map can only receive an onNext from the createUser
.map(u -> new ResponseEntity(u, HttpStatus.CREATED))
;