运行 使用 Spring Webflux 时在后台返回响应时使用 Mono

Running a Mono in background while returning a response when using Spring Webflux

这个问题和Return immediately in spring web flux有关系,但我觉得不一样(至少那里的答案我不满意)

我有一个函数 returning 一个 Mono,当调用它时会启动一个长 运行 作业。当调用 Spring Webflux HTTP API 时调用此函数。这是一个例子:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
    val longRunningJob : Mono<Job> = startNewJob(jobId)
    longRunningJob.map { job ->
        val jobUri = generateJobUri(request, job.id)
        ResponseEntity.created(jobURI).build<Unit>()
    }
}

上面代码的问题是“201 Created”是在 长 运行 作业完成后创建的。我想在后台启动 longRunningJob 并立即 return “201 已创建”。

我也许可以这样做:

@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
                request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {

    startNewJob(jobId)
        .subscribeOn(Schedulers.newSingle("thread"))
        .subscribe()

    val jobUri = generateJobUri(request, job.id)
    val response = ResponseEntity.created(jobURI).build<Unit>()
    Mono.just(response)
}

但我觉得必须手动调用 subscribe() 似乎不太习惯(例如,intellij 抱怨我在非阻塞范围内调用 subscribe())。难道没有更好的方法来组合两个 "streams" 而无需使用显式 subscribe 吗?如果是这样,我该如何修改上面的 startNewJob 函数来实现此目的?

据我所知,使用 subscribe 方法之一是真正在后台启动具有自己生命周期的作业的唯一方法(不依赖于返回的发布者)。

如果您要使用其中一个运算符来组合作业发布者和响应发布者(例如 zipmerge),那么作业发布者的生命周期将与响应发布者,这不是您想要的后台作业。

您可能要考虑的一件事是在响应发布者流中启动后台作业,而不是直接在方法主体中启动。例如通过 doOnSubscibe 或来自响应上游的操作员。

这会将后台作业的启动与响应发布者的 onSubscribe 事件联系起来,但仍允许它在后台完成。

另请注意,如果您希望能够取消后台作业(例如,可能在应用程序关闭期间),您需要保存从 subscribe 返回的 Disposable 以便您可以稍后调用 dispose 就可以了。这可能最好通过某种类型的 BackgroundJobManager 来完成,它可以跟踪所有作业 运行.

private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());