Observables 的单元测试客户端

Unit testing clients of Observables

我有以下方法go()我想测试一下:

private Pair<String, String> mPair;

public void go() {
    Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            }
    )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String> pair) {
                    mApi.webCall3(pair.first, pair.second);
                }
            });
}

此方法使用Observable.zip()异步执行http请求,并将它们合并为一个Pair。最后,使用这些先前请求的结果执行另一个 http 请求。

我想验证调用 go() 方法会发出 webCall()webCall2() 请求,然后是 webCall3(String, String) 请求。因此,我希望通过以下测试(使用 Mockito 监视 Api):

@Test
public void testGo() {
    /* Given */
    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First");
        }

        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }

        @Override
        public void webCall3() {
        }
    });

    Test test = new Test(api);

    /* When */
    test.go();

    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

然而,当 运行 这时,网络调用异步执行,我的测试在订阅者完成之前执行断言,导致我的测试失败。

我读到您可以对所有方法使用 RxJavaSchedulersHookRxAndroidSchedulersHook 到 return Schedulers.immediate(),但这会导致无限期地测试 运行 .

我 运行 我在本地 JVM 上进行单元测试。

我怎样才能做到这一点,最好不必修改 go() 的签名?

(感谢 retrolambda 的 Lambda)

对于初学者,我将改写为:

private Pair<String, String> mPair;

public Observable<Pair<String, String>> go() {
    return Observable.zip(
                mApi.webCall(),
                mApi.webCall2(),
                (String s, String s2) -> new Pair(s, s2)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(pair -> mPair = pair);
}

public Pair<String, String> getPair() {
    return mPair;
}

doOnNext 允许您在有人订阅 Observable

时拦截链中正在处理的值

然后,我会这样调用测试:

Pair result = test.go().toBlocking().lastOrDefault(null);

那你可以测试一下result是什么

我会在您的测试中使用 TestSchedulerTestSubscriber。为了使用它,您必须接收组成 zip 的可观察对象,以便您可以使用 testScheduler 订阅该工作。此外,您还必须参数化您的调度程序。您不必更改 go 方法的签名,但您必须在底层功能中参数化调度程序。您可以通过构造函数注入调度程序,通过继承覆盖 protected 字段,或调用 package 受保护的重载。我已经编写了我的示例,假设一个重载接受调度程序作为参数和 returns Observable。

TestScheduler 为您提供了一种以可预测、可重现的方式触发异步运算符行为的同步方式。 TestSubscriber 为您提供了一种等待终止并对接收到的值和信号进行断言的方法。此外,您可能想知道 delay(long, TimeUnit) 运算符默认调度在计算调度程序上工作。您还需要在那里使用 testScheduler。

Scheduler ioScheduler = Schedulers.io();
Scheduler mainThreadScheduler =  AndroidSchedulers.mainThread();

public void go() {
    go(subscribeOnScheduler, mainThreadScheduler).toBlocking().single();
}

/*package*/ Observable<Pair<String, String>> go(Scheduler ioScheduler, Scheduler mainThreadScheduler) {
    return Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            })
            .doOnNext(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String>() {
                    mApi.webCall3(pair.first, pair.second);
                })
            })
            .subscribeOn(ioScheduler)
            .observeOn(mainThreadScheduler);
}

测试代码

@Test
public void testGo() {
    /* Given */
    TestScheduler testScheduler = new TestScheduler();

    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First").delay(1, TimeUnit.SECONDS, testScheduler);
        }

        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }

        @Override
        public void webCall3() {
        }
    });

    Test test = new Test(api);

    /* When */
    test.go(testScheduler, testScheduler).subscribe(subscriber);
    testScheduler.triggerActions();
    subscriber.awaitTerminalEvent();

    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

我有 found out,我可以以非静态方式检索我的 Schedulers,基本上是将它们注入我的客户端 class。 SchedulerProvider 替换了对 Schedulers.x():

的静态调用
public interface SchedulerProvider {

    Scheduler io();

    Scheduler mainThread();
}

生产实施委托回 Schedulers:

public class SchedulerProviderImpl implements SchedulerProvider {

    public static final SchedulerProvider INSTANCE = new SchedulerProviderImpl();

    @Override
    public Scheduler io() {
        return Schedulers.io();
    }

    @Override
    public Scheduler mainThread() {
        return AndroidSchedulers.mainThread();
    }
}

但是,在测试期间我可以创建 TestSchedulerProvider:

public class TestSchedulerProvider implements SchedulerProvider {

    private final TestScheduler mIOScheduler = new TestScheduler();

    private final TestScheduler mMainThreadScheduler = new TestScheduler();

    @Override
    public TestScheduler io() {
        return mIOScheduler;
    }

    @Override
    public TestScheduler mainThread() {
        return mMainThreadScheduler;
    }
}

现在我可以将 SchedulerProvider 注入到包含 go() 方法的 Test class 中:

class Test {

    /* ... */

    Test(Api api, SchedulerProvider schedulerProvider) {
        mApi = api;
        mSchedulerProvider = schedulerProvider;
    }

    void go() {
        Observable.zip(
            mApi.webCall(),
            mApi.webCall2(),
            new Func2<String, String, Pair<String, String>>() {
                @Override
                public Pair<String, String> call(String s, String s2) {
                    return new Pair(s, s2);
                }
            }
      )
            .subscribeOn(mSchedulerProvider.io())
            .observeOn(mSchedulerProvider.mainThread())
            .subscribe(new Action1<Pair<String, String>>() {
                @Override
                public void call(Pair<String, String> pair) {
                    mApi.webCall3(pair.first, pair.second);
                }
            });
    }
}

测试如下:

@Test
public void testGo() {
    /* Given */
    TestSchedulerProvider testSchedulerProvider = new TestSchedulerProvider();

    Api api = spy(new Api() {
        @Override
        public Observable<String> webCall() {
            return Observable.just("First");
        }

        @Override
        public Observable<String> webCall2() {
            return Observable.just("second");
        }

        @Override
        public void webCall3() {
        }
    });

    Test test = new Test(api, testSchedulerProvider);

    /* When */
    test.go();
    testSchedulerProvider.io().triggerActions();
    testSchedulerProvider.mainThread().triggerActions();

    /* Then */
    verify(api).webCall();
    verify(api).webCall2();
    verify(api).webCall3("First", "second");
}

我有一个类似的问题,为了解决又多了一步。:

existingObservable
     .zipWith(Observable.interval(100, TimeUnit.MILLISECONDS), new Func1<> ...)
     .subscribeOn(schedulersProvider.computation())

仍然没有使用返回的提供的 TestScheduler schedulersProvider。必须在我正在压缩的各个流上指定 .subscribeOn() 才能工作。:

existingObservable.subscribeOn(schedulersProvider.computation())
     .zipWith(Observable.interval(100, TimeUnit.MILLISECONDS).subscribeOn(schedulersProvider.computation()), new Func1<> ...)
     .subscribeOn(schedulersProvider.computation())

请注意,schedulersProvider 是一个模拟返回我的测试的 TestScheduler!