Mongo Flux/Mono 得到 object/s

Mongo Flux/Mono get object/s

我想从 Flux/Mono 得到 object/objects。 我用

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

我就是这样做的:

    Mono<UserEntity> byEmail = userRepository.findByEmail(userDto.getEmail());
    UserEntity block = byEmail.block();

我有错误:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

为什么?有什么不同的方法可以得到 object/objects?

反应式编程如何做一些事情: 在 RequestBody 中你有 UserDto.

如果不创建用户,请检查数据库中是否存在电子邮件。

来自@simon-baslé 的更新回答

 return userRepository.findByEmail(userDto.getEmail())
    //rethrow DB errors as BadRequestException, except user exists
    .onErrorResume(t -> Mono.error(new BadRequestException()))
    //throwing in map is converted to onError signal
    .map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
    //switchIfEmpty to replace an empty source with a different Mono
    .switchIfEmpty(createUser(userDto))
    //at this point, map can only receive an onNext from the createUser
    .map(u -> new ResponseEntity(u, HttpStatus.CREATED))
    ;

你似乎找到了答案,但让我详细说明一下。

为什么你不能屏蔽

您收到的错误消息表明您试图在不适应阻塞的特定 Thread(或线程池)内恢复到阻塞行为。这是 Spring Webflux(以及 Netty 的幕后)用来处理 应用程序中每个传入请求的线程 。因此,如果您阻止它,您的应用程序将完全无法处理新请求。

您的回答,以及一些小改进

首先,map 可以简化,因为 null 值不允许出现在 FluxMono 中。如果值不在数据库中,Spring 数据 ReactiveCrudRepository 将 return 为空 Mono,不要与发出 [=15= 的 Mono 混淆]:

---( null )---|->
onNext(null), onComplete() // onNext(null) being forbidden

---|->
onComplete()

此外,我认为 onErrorResume 您打算包装数据库错误,但 "user exists" 情况除外?如果是这样,这个 onErrorResume 的位置是错误的,因为它类似于 catch(Throwable e),它也会捕获 UserByEmailExistException。将它放在 之前 map。也可以直接从地图里面扔。

所以这归结为检测一个空的 Mono 与有价值的 Mono,用异步数据库保存替换空的,用 onError:

TL;DR 给我代码

return userRepository.findByEmail(userDto.getEmail())
        //rethrow DB errors as BadRequestException, except user exists
        .onErrorResume(t -> Mono.error(new BadRequestException()))
        //throwing in map is converted to onError signal
        .map(ifItGoesThereItExist-> { throw new UserByEmailExistException(); })
        //switchIfEmpty to replace an empty source with a different Mono
        .switchIfEmpty(createUser(userDto))
        //at this point, map can only receive an onNext from the createUser
        .map(u -> new ResponseEntity(u, HttpStatus.CREATED))
        ;