Projectreactor:并行执行查询并连接结果

Projectreactor: the parallel execution of queries and concatenating the results

晚上好。 我正在学习响应式编程,遇到了以下问题。

我 运行 对数据库进行了两个并行查询,想合并结果并将它们返回

    @GetMapping
    public Mono<User> get(@RequestParam("id") String id, @RequestParam("cId") String cId) {
        Mono<User> userMono = Mono.fromCallable(() -> userServ.get(id))
                .flatMap(userMono1 -> userMono1)
                .subscribeOn(Schedulers.boundedElastic());

        Mono<Comment> ger = Mono.fromCallable(() -> commentServ.ger(cId))
                .flatMap(commentMono -> commentMono)
                .subscribeOn(Schedulers.boundedElastic());


        return Mono.zip(userMono, ger)
                .map(pair -> {
                    User t1 = pair.getT1();
                    t1.setComment(pair.getT2());
                    return t1;
                });

但是重点是注释可能是空的,然后我期望return这样的结构json

{
    "id": "5e6cbf395214a42f51b57121",
    "name": "Bob",
    "surname": null,
    "comment": null
}

相反,我得到一个空的响应。显然这是由于 mono zip,但我还能如何组合结果,同时保持查询并行性

我的实体:

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Comment {
    @Id
    String id;
    String userId;
    String comment;
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
    @Id
    private String id;
    private String name;
    private String surname;
    private Comment comment;
}

我该如何解决这种情况?

Zip/ZipWith 需要元素来产生它们的输出。如果它可以为空,你可以使用下面的方法来设置一些默认值。

  • defaultIfEmpty(new Comment())
  • switchIfEmpty(Mono.fromSupplier(() -> new Comment())

如果不想使用new Comment(),设置null为评论对象,我们可以试试这个方法。

        userMono
                .zipWith(commentMono.defaultIfEmpty(new Comment()))
                .map(pair -> {
                    User user = pair.getT1();
                    Comment comment = pair.getT2();
                    if(Objects.nonNull(comment.getUserId()))
                       user.setComment(comment);
                    return user;
                 });