如何将 2 Spring Mongo 响应式保存操作链接在一起?

How to chain 2 Spring Mongo Reactive Save operations together?

我有一个测试用例,我试图从两个不同的服务触发两个保存操作。 return Mono(我没有使用 Spring Reactive Repository)。

我想按顺序执行以下操作:

  1. 保存配置文件
  2. 创建特征并使用配置文件更新它
  3. Transform/Save 特质(return 特质)

他们单独工作。当我尝试将它们两个链接在一起时,第二个操作会挂起,具体取决于我的操作(或者只是不触发)。

我以为是第一个chained/subscribed?

Profile profile = new GenericProfile();

Object o = profileService.saveProfile(profile)
    .log()
    .flatMap(pp -> {
        TrackingTrait trait = new TrackingTrait(
            "cid",
            "tid",
            pp.getId(),
            null,
            "h",
            "p",
            null);
        return Mono
            .just(trait)
            .log();
    })
    .doOnNext(n -> log.debug("1 {}", n.getProfileId()))
    .flatMap(tt -> this.trackingService
        .track(tt)
        .log())
    .doOnNext(n -> log.debug("2 {}", n.getProfileId()))
    .block();

我的输出看起来像这样,它从未完成打印第二条日志语句(只是旋转)。

[ INFO] reactor.Mono.FlatMap.1                   : | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[ INFO] reactor.Mono.FlatMap.1                   : | request(unbounded)
[ INFO] reactor.Mono.FlatMap.1                   : | onNext(io.logicdrop.profiles.services.GenericProfile@1a01ffff)
[ INFO] reactor.Mono.Just.2                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] reactor.Mono.Just.2                      : | request(unbounded)
[ INFO] reactor.Mono.Just.2                      : | onNext(tid)
[DEBUG] i.l.analytics.AnalyticsPersistTest       : 1 5ad9ed16a29e0e2f82775a82
[ INFO] reactor.Mono.FlatMap.3                   : | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[ INFO] reactor.Mono.FlatMap.3                   : | request(unbounded)

天哪 - 它看起来是 Spring/Mongo 处理索引与 Reactive 内容相结合的方式中的错误。它挂在试图创建索引的第二次调用上。

我偶然发现了这个Reactive mongo hangs with inheritance hierarchies defined in different gradle project

如果我使用这个解决方法并删除所有索引注释(@Index 和@CompoundIndex),至少从 2nd DAO 开始,它就起作用了。

现在,我的日志输出如下:

[ INFO] reactor.Mono.FlatMap.1                   : | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[ INFO] reactor.Mono.FlatMap.1                   : | request(unbounded)
[ INFO] reactor.Mono.FlatMap.1                   : | onNext(io.logicdrop.profiles.services.GenericProfile@28519bfb)
[ INFO] reactor.Mono.Just.2                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] reactor.Mono.Just.2                      : | request(unbounded)
[ INFO] reactor.Mono.Just.2                      : | onNext(tid)
[DEBUG] i.l.analytics.AnalyticsPersistTest       : 1 5ad9f248a29e0e307f885e5e
[ INFO] reactor.Mono.FlatMap.3                   : | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
[ INFO] reactor.Mono.FlatMap.3                   : | request(unbounded)
[ INFO] reactor.Mono.Just.2                      : | onComplete()
[ INFO] reactor.Mono.FlatMap.1                   : | onComplete()
[ INFO] reactor.Mono.FlatMap.3                   : | onNext(tid)
[DEBUG] i.l.analytics.AnalyticsPersistTest       : 2 5ad9f248a29e0e307f885e5e
[ INFO] reactor.Mono.FlatMap.3                   : | onComplete()