运行 使用 Spring Webflux 时在后台返回响应时使用 Mono
Running a Mono in background while returning a response when using Spring Webflux
这个问题和Return immediately in spring web flux有关系,但我觉得不一样(至少那里的答案我不满意)
我有一个函数 returning 一个 Mono
,当调用它时会启动一个长 运行 作业。当调用 Spring Webflux HTTP API 时调用此函数。这是一个例子:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
val longRunningJob : Mono<Job> = startNewJob(jobId)
longRunningJob.map { job ->
val jobUri = generateJobUri(request, job.id)
ResponseEntity.created(jobURI).build<Unit>()
}
}
上面代码的问题是“201 Created”是在 长 运行 作业完成后创建的。我想在后台启动 longRunningJob
并立即 return “201 已创建”。
我也许可以这样做:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
startNewJob(jobId)
.subscribeOn(Schedulers.newSingle("thread"))
.subscribe()
val jobUri = generateJobUri(request, job.id)
val response = ResponseEntity.created(jobURI).build<Unit>()
Mono.just(response)
}
但我觉得必须手动调用 subscribe()
似乎不太习惯(例如,intellij 抱怨我在非阻塞范围内调用 subscribe()
)。难道没有更好的方法来组合两个 "streams" 而无需使用显式 subscribe
吗?如果是这样,我该如何修改上面的 startNewJob
函数来实现此目的?
据我所知,使用 subscribe
方法之一是真正在后台启动具有自己生命周期的作业的唯一方法(不依赖于返回的发布者)。
如果您要使用其中一个运算符来组合作业发布者和响应发布者(例如 zip
或 merge
),那么作业发布者的生命周期将与响应发布者,这不是您想要的后台作业。
您可能要考虑的一件事是在响应发布者流中启动后台作业,而不是直接在方法主体中启动。例如通过 doOnSubscibe
或来自响应上游的操作员。
这会将后台作业的启动与响应发布者的 onSubscribe 事件联系起来,但仍允许它在后台完成。
另请注意,如果您希望能够取消后台作业(例如,可能在应用程序关闭期间),您需要保存从 subscribe
返回的 Disposable
以便您可以稍后调用 dispose
就可以了。这可能最好通过某种类型的 BackgroundJobManager 来完成,它可以跟踪所有作业 运行.
private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());
这个问题和Return immediately in spring web flux有关系,但我觉得不一样(至少那里的答案我不满意)
我有一个函数 returning 一个 Mono
,当调用它时会启动一个长 运行 作业。当调用 Spring Webflux HTTP API 时调用此函数。这是一个例子:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
val longRunningJob : Mono<Job> = startNewJob(jobId)
longRunningJob.map { job ->
val jobUri = generateJobUri(request, job.id)
ResponseEntity.created(jobURI).build<Unit>()
}
}
上面代码的问题是“201 Created”是在 长 运行 作业完成后创建的。我想在后台启动 longRunningJob
并立即 return “201 已创建”。
我也许可以这样做:
@PutMapping("/{jobId}")
fun startNewJob(@PathVariable("jobId") jobId: String,
request: ServerHttpRequest): Mono<ResponseEntity<Unit>> {
startNewJob(jobId)
.subscribeOn(Schedulers.newSingle("thread"))
.subscribe()
val jobUri = generateJobUri(request, job.id)
val response = ResponseEntity.created(jobURI).build<Unit>()
Mono.just(response)
}
但我觉得必须手动调用 subscribe()
似乎不太习惯(例如,intellij 抱怨我在非阻塞范围内调用 subscribe()
)。难道没有更好的方法来组合两个 "streams" 而无需使用显式 subscribe
吗?如果是这样,我该如何修改上面的 startNewJob
函数来实现此目的?
据我所知,使用 subscribe
方法之一是真正在后台启动具有自己生命周期的作业的唯一方法(不依赖于返回的发布者)。
如果您要使用其中一个运算符来组合作业发布者和响应发布者(例如 zip
或 merge
),那么作业发布者的生命周期将与响应发布者,这不是您想要的后台作业。
您可能要考虑的一件事是在响应发布者流中启动后台作业,而不是直接在方法主体中启动。例如通过 doOnSubscibe
或来自响应上游的操作员。
这会将后台作业的启动与响应发布者的 onSubscribe 事件联系起来,但仍允许它在后台完成。
另请注意,如果您希望能够取消后台作业(例如,可能在应用程序关闭期间),您需要保存从 subscribe
返回的 Disposable
以便您可以稍后调用 dispose
就可以了。这可能最好通过某种类型的 BackgroundJobManager 来完成,它可以跟踪所有作业 运行.
private static final Scheduler backgroundTaskScheduler = Schedulers.newParallel("backgroundTaskScheduler", 2);
backgroundTaskScheduler.schedule(() -> doBackgroundJob());