RxAndroid,Retrofit 2 单元测试 Schedulers.io
RxAndroid, Retrofit 2 unit test Schedulers.io
刚学RxAndroid,可惜我学的书没有单元测试。我在 google 上搜索了很多,但没有找到任何简单的教程以精确的方式涵盖 RxAndroid 单元测试。
我基本上使用 RxAndroid 和 Retrofit 2 编写了一个小的 REST API。这是 ApiManager class:
public class MyAPIManager {
private final MyService myService;
public MyAPIManager() {
HttpLoggingInterceptor logging = new HttpLoggingInterceptor();
// set your desired log level
logging.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient.Builder b = new OkHttpClient.Builder();
b.readTimeout(35000, TimeUnit.MILLISECONDS);
b.connectTimeout(35000, TimeUnit.MILLISECONDS);
b.addInterceptor(logging);
OkHttpClient client = b.build();
Retrofit retrofit = new Retrofit.Builder()
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.baseUrl("http://192.168.1.7:8000")
.client(client)
.build();
myService = retrofit.create(MyService.class);
}
public Observable<Token> getToken(String username, String password) {
return myService.getToken(username, password)
.subscribeOn(Schedulers.io());
.observeOn(AndroidSchedulers.mainThread());
}
}
我正在尝试为 getToken
创建单元测试。这是我的示例测试:
public class MyAPIManagerTest {
private MyAPIManager myAPIManager;
@Test
public void getToken() throws Exception {
myAPIManager = new MyAPIManager();
Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
o.test().assertSubscribed();
o.test().assertValueCount(1);
}
}
由于 subscribeOn(Schedulers.io)
上面的测试没有在主线程上 运行 因为它 return 的值为 0。如果我从 MyAPIManager
中删除 subscribeOn(Schedulers.io)
那么它 运行 很好并且 return 1 值。有什么方法可以用 Schedulers.io
进行测试吗?
很好的问题,当然也是社区中缺乏大量报道的话题。我想分享几个我个人使用过并且非常出色的解决方案。这些被认为是用于 RxJava 2,但它们在 RxJava 1 中可用,只是名称不同。如果你需要它,你一定会找到它。
- RxPlugins 和 RxAndroidPlugins(这是我目前最喜欢的)
所以Rx实际上提供了一种机制来改变Schedulers
和AndroidSchedulers
内部静态方法提供的调度器。例如:
RxJavaPlugins.setComputationSchedulerHandler
RxJavaPlugins.setIoSchedulerHandler
RxJavaPlugins.setNewThreadSchedulerHandler
RxJavaPlugins.setSingleSchedulerHandler
RxAndroidPlugins.setInitMainThreadSchedulerHandler
这些做的事情很简单。他们确保当您调用 Schedulers.io()
时,返回的调度程序是您在 setIoSchedulerHandler
中设置的处理程序中提供的调度程序。您要使用哪个调度程序?那么你想要 Schedulers.trampoline()
。这意味着代码将 运行 在与以前相同的线程上。如果所有调度器都在 trampoline 调度器中,那么所有调度器都将 运行ning 在 JUnit
线程上。在测试 运行 之后,您可以通过调用来清理整个事情:
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
我认为最好的方法是使用 JUnit 规则。这是一个可能的(抱歉 kotlin 语法):
class TrampolineSchedulerRule : TestRule {
private val scheduler by lazy { Schedulers.trampoline() }
override fun apply(base: Statement?, description: Description?): Statement =
object : Statement() {
override fun evaluate() {
try {
RxJavaPlugins.setComputationSchedulerHandler { scheduler }
RxJavaPlugins.setIoSchedulerHandler { scheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
RxJavaPlugins.setSingleSchedulerHandler { scheduler }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
base?.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
}
}
在单元测试的顶部,您只需要声明一个 public 属性,用 @Rule
注释并用 class:
实例化
@Rule
public TrampolineSchedulerRule rule = new TrampolineSchedulerRule()
在科特林中
@get:Rule
val rule = TrampolineSchedulerRule()
- 注入调度程序(a.k.a。依赖注入)
另一种可能性是在您的 classes 中注入调度程序,这样在测试时您可以再次注入 Schedulers.trampoline()
并且在您的应用程序中您可以注入正常的调度程序。这可能会工作一段时间,但当您需要为一个简单的 class 注入大量调度程序时,它很快就会变得麻烦。这是执行此操作的一种方法
public class MyAPIManager {
private final MyService myService;
private final Scheduler io;
private final Scheduler mainThread;
public MyAPIManager(Scheduler io, Scheduler mainThread) {
// initialise everything
this.io = io;
this.mainThread = mainThread;
}
public Observable<Token> getToken(String username, String password) {
return myService.getToken(username, password)
.subscribeOn(io);
.observeOn(mainThread);
}
}
如您所见,我们现在可以告诉 class 实际的调度程序。在你的测试中你会做类似的事情:
public class MyAPIManagerTest {
private MyAPIManager myAPIManager;
@Test
public void getToken() throws Exception {
myAPIManager = new MyAPIManager(
Schedulers.trampoline(),
Schedulers.trampoline());
Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
o.test().assertSubscribed();
o.test().assertValueCount(1);
}
}
重点是:
您希望它在 Schedulers.trampoline()
调度程序上以确保 JUnit 线程运行 上的所有内容
您需要能够在测试时修改调度程序。
就是这样。希望对你有帮助。
============================================= ============
这里是 Java 版本,是我按照上面的 Kotlin 示例使用的:
public class TrampolineSchedulerRule implements TestRule {
@Override
public Statement apply(Statement base, Description description) {
return new MyStatement(base);
}
public class MyStatement extends Statement {
private final Statement base;
@Override
public void evaluate() throws Throwable {
try {
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setSingleSchedulerHandler(scheduler -> Schedulers.trampoline());
RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
public MyStatement(Statement base) {
this.base = base;
}
}
}
刚学RxAndroid,可惜我学的书没有单元测试。我在 google 上搜索了很多,但没有找到任何简单的教程以精确的方式涵盖 RxAndroid 单元测试。
我基本上使用 RxAndroid 和 Retrofit 2 编写了一个小的 REST API。这是 ApiManager class:
public class MyAPIManager {
private final MyService myService;
public MyAPIManager() {
HttpLoggingInterceptor logging = new HttpLoggingInterceptor();
// set your desired log level
logging.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient.Builder b = new OkHttpClient.Builder();
b.readTimeout(35000, TimeUnit.MILLISECONDS);
b.connectTimeout(35000, TimeUnit.MILLISECONDS);
b.addInterceptor(logging);
OkHttpClient client = b.build();
Retrofit retrofit = new Retrofit.Builder()
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.baseUrl("http://192.168.1.7:8000")
.client(client)
.build();
myService = retrofit.create(MyService.class);
}
public Observable<Token> getToken(String username, String password) {
return myService.getToken(username, password)
.subscribeOn(Schedulers.io());
.observeOn(AndroidSchedulers.mainThread());
}
}
我正在尝试为 getToken
创建单元测试。这是我的示例测试:
public class MyAPIManagerTest {
private MyAPIManager myAPIManager;
@Test
public void getToken() throws Exception {
myAPIManager = new MyAPIManager();
Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
o.test().assertSubscribed();
o.test().assertValueCount(1);
}
}
由于 subscribeOn(Schedulers.io)
上面的测试没有在主线程上 运行 因为它 return 的值为 0。如果我从 MyAPIManager
中删除 subscribeOn(Schedulers.io)
那么它 运行 很好并且 return 1 值。有什么方法可以用 Schedulers.io
进行测试吗?
很好的问题,当然也是社区中缺乏大量报道的话题。我想分享几个我个人使用过并且非常出色的解决方案。这些被认为是用于 RxJava 2,但它们在 RxJava 1 中可用,只是名称不同。如果你需要它,你一定会找到它。
- RxPlugins 和 RxAndroidPlugins(这是我目前最喜欢的)
所以Rx实际上提供了一种机制来改变Schedulers
和AndroidSchedulers
内部静态方法提供的调度器。例如:
RxJavaPlugins.setComputationSchedulerHandler
RxJavaPlugins.setIoSchedulerHandler
RxJavaPlugins.setNewThreadSchedulerHandler
RxJavaPlugins.setSingleSchedulerHandler
RxAndroidPlugins.setInitMainThreadSchedulerHandler
这些做的事情很简单。他们确保当您调用 Schedulers.io()
时,返回的调度程序是您在 setIoSchedulerHandler
中设置的处理程序中提供的调度程序。您要使用哪个调度程序?那么你想要 Schedulers.trampoline()
。这意味着代码将 运行 在与以前相同的线程上。如果所有调度器都在 trampoline 调度器中,那么所有调度器都将 运行ning 在 JUnit
线程上。在测试 运行 之后,您可以通过调用来清理整个事情:
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
我认为最好的方法是使用 JUnit 规则。这是一个可能的(抱歉 kotlin 语法):
class TrampolineSchedulerRule : TestRule {
private val scheduler by lazy { Schedulers.trampoline() }
override fun apply(base: Statement?, description: Description?): Statement =
object : Statement() {
override fun evaluate() {
try {
RxJavaPlugins.setComputationSchedulerHandler { scheduler }
RxJavaPlugins.setIoSchedulerHandler { scheduler }
RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
RxJavaPlugins.setSingleSchedulerHandler { scheduler }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
base?.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
}
}
}
}
在单元测试的顶部,您只需要声明一个 public 属性,用 @Rule
注释并用 class:
@Rule
public TrampolineSchedulerRule rule = new TrampolineSchedulerRule()
在科特林中
@get:Rule
val rule = TrampolineSchedulerRule()
- 注入调度程序(a.k.a。依赖注入)
另一种可能性是在您的 classes 中注入调度程序,这样在测试时您可以再次注入 Schedulers.trampoline()
并且在您的应用程序中您可以注入正常的调度程序。这可能会工作一段时间,但当您需要为一个简单的 class 注入大量调度程序时,它很快就会变得麻烦。这是执行此操作的一种方法
public class MyAPIManager {
private final MyService myService;
private final Scheduler io;
private final Scheduler mainThread;
public MyAPIManager(Scheduler io, Scheduler mainThread) {
// initialise everything
this.io = io;
this.mainThread = mainThread;
}
public Observable<Token> getToken(String username, String password) {
return myService.getToken(username, password)
.subscribeOn(io);
.observeOn(mainThread);
}
}
如您所见,我们现在可以告诉 class 实际的调度程序。在你的测试中你会做类似的事情:
public class MyAPIManagerTest {
private MyAPIManager myAPIManager;
@Test
public void getToken() throws Exception {
myAPIManager = new MyAPIManager(
Schedulers.trampoline(),
Schedulers.trampoline());
Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
o.test().assertSubscribed();
o.test().assertValueCount(1);
}
}
重点是:
您希望它在
Schedulers.trampoline()
调度程序上以确保 JUnit 线程运行 上的所有内容您需要能够在测试时修改调度程序。
就是这样。希望对你有帮助。
============================================= ============
这里是 Java 版本,是我按照上面的 Kotlin 示例使用的:
public class TrampolineSchedulerRule implements TestRule {
@Override
public Statement apply(Statement base, Description description) {
return new MyStatement(base);
}
public class MyStatement extends Statement {
private final Statement base;
@Override
public void evaluate() throws Throwable {
try {
RxJavaPlugins.setComputationSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
RxJavaPlugins.setSingleSchedulerHandler(scheduler -> Schedulers.trampoline());
RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
base.evaluate();
} finally {
RxJavaPlugins.reset();
RxAndroidPlugins.reset();
}
}
public MyStatement(Statement base) {
this.base = base;
}
}
}