如何在反应堆中编码以更新卡桑德拉数据库记录中的特定字段

How to code in reactor to update a specific field in a cassandra DB record

我正在使用 ReactiveCassandraRepository,我可以如下创建新记录。

public Mono<String> saveAbc(Abc toBeSaved) {
    return abcRepository.save(toBeSaved).map(saved -> saved.getId());
}

但我无法想象如何更新数据库记录中的特定字段,因为涉及 2 个反应性操作(findById 和保存)。

我编写了如下代码来创建或更新状态(如果存在),但似乎不起作用。

public Mono<String> saveAbc(Abc toBeSaved) {
    return abcRepository.findById(toBeSaved.getId())
        .map(current -> abcRepository.save(transform(toBeSaved, current)).map(saved -> saved.getId()))
        .flatMap(id -> id);
}

private Abc transform(Abc toBeSaved, Abc current) {
    if(current == null) {
        return toBeSaved;
    } else {
        current.setStatus(toBeSaved.getStatus());
        return current;
    }
}

有人可以帮忙吗?

我希望您的 abcRepository 方法看起来像这样:

interface AbcRepository {
    Mono<Abc> findById(String id);

    Mono<Abc> save(Abc abc);
}

我从你的代码中猜想,对于给定的 Abc,你想要

  1. 从存储库中读取具有相同 ID 的 Abc,
  2. 将给定 Abc 的数据映射到找到的数据,
  3. 或者如果存储库没有找到,则只使用给定的 Abc,
  4. 异步保存这个Abc
  5. 和return保存元素的id为Mono

我会这样做:

public Mono<String> saveAbc(Abc toBeSaved) {
    return abcRepository.findById(toBeSaved.getId()) // (1)
            .map(abc -> transform(toBeSaved, abc))   // (2)
            .defaultIfEmpty(toBeSaved)               // (3)
            .flatMap(abcRepository::save)            // (4)
            .map(Abc::getId);                        // (5)
}

private Abc transform(Abc toBeSaved, Abc current) {
        current.setStatus(toBeSaved.getStatus());
        return current;
}

一个 Mono 只能接收一个元素或没有元素,所以当使用 Mono:map (2) 时你不需要处理空值。由 abcRepository 编辑的 Mono return 将接收找到的 Abc,在这种情况下,转换调用 (2) 已完成,或者它只会发出一个完整的信号,在这种情况下,地图什么都不做,而 defaultIfEmpty (3) 会发出 toBeSaved 作为回退。

如果你有一个本身是异步的转换并因此导致另一个 Mono 使用 flatMap (4),否则你的中间结果将是 Mono<Mono<Abc>>.

永远记住:在调用 subscribe 之前什么都不会发生。

saveAbc(myNewAbc).subscribe(id -> System.out.println("Saved Abc with id: " + id));

在上面的示例中,我希望您的存储库在 findById 找不到任何匹配的 Abc 时发出完整信号,这将使 Mono 完成为空(使用 ReactiveCassandraRepository 时就是这种情况!)。相反,如果存储库在这种情况下发出异常,您可以使用

.onErrorResume(t -> Mono.just(toBeSaved))

而不是 defaultIfEmpty (3).