RxAndroid,Retrofit 2 单元测试 Schedulers.io

RxAndroid, Retrofit 2 unit test Schedulers.io

刚学RxAndroid,可惜我学的书没有单元测试。我在 google 上搜索了很多,但没有找到任何简单的教程以精确的方式涵盖 RxAndroid 单元测试。

我基本上使用 RxAndroid 和 Retrofit 2 编写了一个小的 REST API。这是 ApiManager class:

public class MyAPIManager {
    private final MyService myService;

    public MyAPIManager() {
        HttpLoggingInterceptor logging = new HttpLoggingInterceptor();
        // set your desired log level
        logging.setLevel(HttpLoggingInterceptor.Level.BODY);

        OkHttpClient.Builder b = new OkHttpClient.Builder();
        b.readTimeout(35000, TimeUnit.MILLISECONDS);
        b.connectTimeout(35000, TimeUnit.MILLISECONDS);
        b.addInterceptor(logging);
        OkHttpClient client = b.build();

        Retrofit retrofit = new Retrofit.Builder()
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("http://192.168.1.7:8000")
                .client(client)
                .build();

        myService = retrofit.create(MyService.class);
    }

    public Observable<Token> getToken(String username, String password) {
        return myService.getToken(username, password)
                .subscribeOn(Schedulers.io());
                .observeOn(AndroidSchedulers.mainThread());
    }
}

我正在尝试为 getToken 创建单元测试。这是我的示例测试:

public class MyAPIManagerTest {
    private MyAPIManager myAPIManager;
    @Test
    public void getToken() throws Exception {
        myAPIManager = new MyAPIManager();

        Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
        o.test().assertSubscribed();
        o.test().assertValueCount(1);
    }

}

由于 subscribeOn(Schedulers.io) 上面的测试没有在主线程上 运行 因为它 return 的值为 0。如果我从 MyAPIManager 中删除 subscribeOn(Schedulers.io) 那么它 运行 很好并且 return 1 值。有什么方法可以用 Schedulers.io 进行测试吗?

很好的问题,当然也是社区中缺乏大量报道的话题。我想分享几个我个人使用过并且非常出色的解决方案。这些被认为是用于 RxJava 2,但它们在 RxJava 1 中可用,只是名称不同。如果你需要它,你一定会找到它。

  1. RxPlugins 和 RxAndroidPlugins(这是我目前最喜欢的)

所以Rx实际上提供了一种机制来改变SchedulersAndroidSchedulers内部静态方法提供的调度器。例如:

RxJavaPlugins.setComputationSchedulerHandler
RxJavaPlugins.setIoSchedulerHandler
RxJavaPlugins.setNewThreadSchedulerHandler
RxJavaPlugins.setSingleSchedulerHandler
RxAndroidPlugins.setInitMainThreadSchedulerHandler

这些做的事情很简单。他们确保当您调用 Schedulers.io() 时,返回的调度程序是您在 setIoSchedulerHandler 中设置的处理程序中提供的调度程序。您要使用哪个调度程序?那么你想要 Schedulers.trampoline()。这意味着代码将 运行 在与以前相同的线程上。如果所有调度器都在 trampoline 调度器中,那么所有调度器都将 运行ning 在 JUnit 线程上。在测试 运行 之后,您可以通过调用来清理整个事情:

RxJavaPlugins.reset()
RxAndroidPlugins.reset()

我认为最好的方法是使用 JUnit 规则。这是一个可能的(抱歉 kotlin 语法):

class TrampolineSchedulerRule : TestRule {
  private val scheduler by lazy { Schedulers.trampoline() }

  override fun apply(base: Statement?, description: Description?): Statement =
        object : Statement() {
            override fun evaluate() {
                try {
                    RxJavaPlugins.setComputationSchedulerHandler { scheduler }
                    RxJavaPlugins.setIoSchedulerHandler { scheduler }
                    RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
                    RxJavaPlugins.setSingleSchedulerHandler { scheduler }
                    RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
                    base?.evaluate()
                } finally {
                    RxJavaPlugins.reset()
                    RxAndroidPlugins.reset()
                }
            }
        }
}

在单元测试的顶部,您只需要声明一个 public 属性,用 @Rule 注释并用 class:

实例化
@Rule
public TrampolineSchedulerRule rule = new TrampolineSchedulerRule()

在科特林中

@get:Rule
val rule = TrampolineSchedulerRule()
  1. 注入调度程序(a.k.a。依赖注入)

另一种可能性是在您的 classes 中注入调度程序,这样在测试时您可以再次注入 Schedulers.trampoline() 并且在您的应用程序中您可以注入正常的调度程序。这可能会工作一段时间,但当您需要为一个简单的 class 注入大量调度程序时,它很快就会变得麻烦。这是执行此操作的一种方法

public class MyAPIManager {
  private final MyService myService;
  private final Scheduler io;
  private final Scheduler mainThread;

  public MyAPIManager(Scheduler io, Scheduler mainThread) {
    // initialise everything
    this.io = io;
    this.mainThread = mainThread;
  }

  public Observable<Token> getToken(String username, String password) {
    return myService.getToken(username, password)
            .subscribeOn(io);
            .observeOn(mainThread);
  }
}

如您所见,我们现在可以告诉 class 实际的调度程序。在你的测试中你会做类似的事情:

public class MyAPIManagerTest {
  private MyAPIManager myAPIManager;
  @Test
  public void getToken() throws Exception {
    myAPIManager = new MyAPIManager(
          Schedulers.trampoline(),
          Schedulers.trampoline());

    Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
    o.test().assertSubscribed();
    o.test().assertValueCount(1);
  }
}

重点是:

  • 您希望它在 Schedulers.trampoline() 调度程序上以确保 JUnit 线程运行 上的所有内容

  • 您需要能够在测试时修改调度程序。

就是这样。希望对你有帮助。

============================================= ============

这里是 Java 版本,是我按照上面的 Kotlin 示例使用的:

public class TrampolineSchedulerRule implements TestRule {
    @Override
    public Statement apply(Statement base, Description description) {
        return new MyStatement(base);
    }

    public class MyStatement extends Statement {
        private final Statement base;

        @Override
        public void evaluate() throws Throwable {
            try {
                RxJavaPlugins.setComputationSchedulerHandler(scheduler -> Schedulers.trampoline());
                RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline());
                RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
                RxJavaPlugins.setSingleSchedulerHandler(scheduler -> Schedulers.trampoline());
                RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
                base.evaluate();
            } finally {
                RxJavaPlugins.reset();
                RxAndroidPlugins.reset();
            }
        }

        public MyStatement(Statement base) {
            this.base = base;
        }
    }
}