RxJava 测试:如何等待所有后台任务完成
RxJava testing : how to wait for all background tasks to complete
TLDR :我在 RxJava Observables 中进行后台处理,我正在进行集成测试,我希望能够独立等待该处理完成以确保从一个测试开始的后台处理不会干扰与另一个测试。
简化,我有一个 @RequestMapping
方法执行以下操作:
- 在数据库中插入数据
- 启动该数据的异步处理(通过 Feign 进行 http 调用,数据库更新)
- returns 没有 (
HttpStatus.NO_CONTENT
)
这个异步处理之前是用 ThreadPoolTaskExecutor
完成的。我们将过渡到 RxJava,并希望删除此 ThreadPoolTaskExecutor 并使用 RxJava 进行后台处理。
当时我很天真地尝试这样做:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
最终目标当然是,一步一个脚印,进入"call to long blocking method"并一直使用 Observable。
在此之前,我想让我的集成测试先工作。我正在通过对映射执行 RestTemplate 调用来测试它。由于大部分工作都是异步的,所以我的调用 returns 非常快。现在我想找到一种方法来等待异步处理完成(确保它不会与另一个测试冲突)。
在使用 RxJava 之前,我只会计算 ThreadPoolTaskExecutor 中的任务并等待它达到 0。
我如何使用 RxJava 做到这一点?
我尝试了什么:
- 我尝试使用 RxJavaSchedulersHook 让我所有的调度程序立即执行:这会导致某处出现某种阻塞,代码执行会在我的 Feign 调用之前停止(Feign 在后台使用 RxJava)
- 我尝试使用 Rx RxJavaObservableExecutionHook 计算任务数:我尝试保留订阅,并在 isSubcribed = false 时删除它们,但这根本不起作用(很多订阅者,计数永远不会下降)
- 我试图在实际生产代码中加入一个 observeOn(immediate())。这似乎可行,我可以为 runtime/test 阶段注入正确的调度程序,但我并不热衷于将仅用于测试目的的代码放入我的实际生产代码中。
我可能错得很厉害,或者把事情复杂化了,所以请不要犹豫,纠正我的推理!
您可以使用 ExecutorServiceAdapter
从 Spring ThreadPoolTaskExecutor
桥接到 RX 中的 ExecutorService
,然后使用与之前相同的技巧。
你怎么样 return HttpStatus.NO_CONTENT
?
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe();
return HttpStatus.NO_CONTENT;
}
在这种形式下,您无法知道 longMethod
何时完成。
如果您想知道所有异步作业何时完成,您可以 return HttpStatus.NO_CONTENT
所有作业何时完成,使用 Spring DefferedResult
或使用 TestSubscriber
PS: 如果你想要
你可以使用 Observable.fromCallable(() -> longMethod());
而不是 Observable.defer(() -> Observable.just(longMethod());
使用 DefferedResult
@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
Observable.fromCallable(() -> longMethod())
.subscribeOn(Schedulers.io())
.subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
return deferredResult;
}
像这样,如果你调用你的方法,你只会在你的 observable 完成时得到你的结果(所以,当 longMethod
完成时)
使用测试订阅者
你必须注入一个 TestSubscriber
并且当要求他 wait/check 完成你的 Observable 时:
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe(subscriber); // you'll have to inject this subscriber in your test
return HttpStatus.NO_CONTENT;
}
在你的测试中:
TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
// ....
controller.home();
subscriber.awaitTerminalEvent();
subscriber.assertCompleted(); // check that no error occurred
几个月后的比赛:我的建议很简单"don't do that"。 RxJava
不太适合这种工作。没有太多细节在后台有很多 "loose" Observable 运行 是不合适的:根据你的请求量,你很容易陷入队列和内存问题,更重要的是会发生什么如果网络服务器崩溃,所有计划和 运行 任务?你如何重新启动它?
Spring 提供了其他更好的选择恕我直言:Spring Batch, Spring Cloud Task, messaging with Spring Cloud Stream,所以不要像我那样做,只需使用正确的工具来完成正确的工作。
现在如果你真的想走坏路:
- 要么 return 一个 SseEmmitter 并仅在消费者服务中使用来自 SSE 的第一个事件,然后在您的测试中使用所有事件
- 要么创建一个 RxJava
lift
运算符,它(在 call
方法中)将订阅者包装在具有 waitForCompletion
方法的父订阅者中。如何等待取决于您(例如 CountDownLatch
)。该订户将被添加到同步列表中(并在完成后从中删除),并且在您的测试中,您可以遍历列表并在列表的每个项目上调用 waitForCompletion
。它并没有那么复杂,我让它工作了,但是请不要那样做!
TLDR :我在 RxJava Observables 中进行后台处理,我正在进行集成测试,我希望能够独立等待该处理完成以确保从一个测试开始的后台处理不会干扰与另一个测试。
简化,我有一个 @RequestMapping
方法执行以下操作:
- 在数据库中插入数据
- 启动该数据的异步处理(通过 Feign 进行 http 调用,数据库更新)
- returns 没有 (
HttpStatus.NO_CONTENT
)
这个异步处理之前是用 ThreadPoolTaskExecutor
完成的。我们将过渡到 RxJava,并希望删除此 ThreadPoolTaskExecutor 并使用 RxJava 进行后台处理。
当时我很天真地尝试这样做:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
最终目标当然是,一步一个脚印,进入"call to long blocking method"并一直使用 Observable。
在此之前,我想让我的集成测试先工作。我正在通过对映射执行 RestTemplate 调用来测试它。由于大部分工作都是异步的,所以我的调用 returns 非常快。现在我想找到一种方法来等待异步处理完成(确保它不会与另一个测试冲突)。
在使用 RxJava 之前,我只会计算 ThreadPoolTaskExecutor 中的任务并等待它达到 0。
我如何使用 RxJava 做到这一点?
我尝试了什么:
- 我尝试使用 RxJavaSchedulersHook 让我所有的调度程序立即执行:这会导致某处出现某种阻塞,代码执行会在我的 Feign 调用之前停止(Feign 在后台使用 RxJava)
- 我尝试使用 Rx RxJavaObservableExecutionHook 计算任务数:我尝试保留订阅,并在 isSubcribed = false 时删除它们,但这根本不起作用(很多订阅者,计数永远不会下降)
- 我试图在实际生产代码中加入一个 observeOn(immediate())。这似乎可行,我可以为 runtime/test 阶段注入正确的调度程序,但我并不热衷于将仅用于测试目的的代码放入我的实际生产代码中。
我可能错得很厉害,或者把事情复杂化了,所以请不要犹豫,纠正我的推理!
您可以使用 ExecutorServiceAdapter
从 Spring ThreadPoolTaskExecutor
桥接到 RX 中的 ExecutorService
,然后使用与之前相同的技巧。
你怎么样 return HttpStatus.NO_CONTENT
?
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe();
return HttpStatus.NO_CONTENT;
}
在这种形式下,您无法知道 longMethod
何时完成。
如果您想知道所有异步作业何时完成,您可以 return HttpStatus.NO_CONTENT
所有作业何时完成,使用 Spring DefferedResult
或使用 TestSubscriber
PS: 如果你想要
你可以使用Observable.fromCallable(() -> longMethod());
而不是 Observable.defer(() -> Observable.just(longMethod());
使用 DefferedResult
@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
Observable.fromCallable(() -> longMethod())
.subscribeOn(Schedulers.io())
.subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
return deferredResult;
}
像这样,如果你调用你的方法,你只会在你的 observable 完成时得到你的结果(所以,当 longMethod
完成时)
使用测试订阅者
你必须注入一个 TestSubscriber
并且当要求他 wait/check 完成你的 Observable 时:
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe(subscriber); // you'll have to inject this subscriber in your test
return HttpStatus.NO_CONTENT;
}
在你的测试中:
TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
// ....
controller.home();
subscriber.awaitTerminalEvent();
subscriber.assertCompleted(); // check that no error occurred
几个月后的比赛:我的建议很简单"don't do that"。 RxJava
不太适合这种工作。没有太多细节在后台有很多 "loose" Observable 运行 是不合适的:根据你的请求量,你很容易陷入队列和内存问题,更重要的是会发生什么如果网络服务器崩溃,所有计划和 运行 任务?你如何重新启动它?
Spring 提供了其他更好的选择恕我直言:Spring Batch, Spring Cloud Task, messaging with Spring Cloud Stream,所以不要像我那样做,只需使用正确的工具来完成正确的工作。
现在如果你真的想走坏路:
- 要么 return 一个 SseEmmitter 并仅在消费者服务中使用来自 SSE 的第一个事件,然后在您的测试中使用所有事件
- 要么创建一个 RxJava
lift
运算符,它(在call
方法中)将订阅者包装在具有waitForCompletion
方法的父订阅者中。如何等待取决于您(例如CountDownLatch
)。该订户将被添加到同步列表中(并在完成后从中删除),并且在您的测试中,您可以遍历列表并在列表的每个项目上调用waitForCompletion
。它并没有那么复杂,我让它工作了,但是请不要那样做!