我如何使用 Project Reactor Stepverifier 从 RxJava 测试一个没有事件的逻辑?

How would I test a no events yet logic with from RxJava with Project Reactor Stepverifier?

我目前正在尝试弄清楚如何在 Project Reactor 中进行类似的测试。 基本上我想确保在连接之前没有事件发生。

@Test
void connectable() {
    Observable<String> provider = Observable.just("Test1", "Test2");
    ConnectableObservable<String> connectable = provider.publish();
    TestObserver<String> testSubscriber = connectable.test();
    testSubscriber.assertEmpty();
    connectable.connect();
    testSubscriber.assertResult("Test1", "Test2").assertComplete();
}

这是我目前的尝试,但不正确,我该如何让它发挥作用?

@Test
void connectable() {
    Flux<String> provider = Flux.just("Test1", "Test2");
    ConnectableFlux<String> connectable = provider.publish();
    FirstStep<String> tester = StepVerifier.create(connectable).expectNoEvent(Duration.ofMinutes(1));
    connectable.connect();
    tester.expectNext("Test1", "Test2").expectComplete().verify();
}

你快到了。 StepVerifier 测试整个序列,不能在中间添加命令式断言。但是您可以在 StepVerifier!为此,使用 then(Runnable):

@Test
public void stepVerifierTestConnect() {
    Flux<String> provider = Flux.just("Test1", "Test2");
    ConnectableFlux<String> connectable = provider.publish();

    StepVerifier.create(connectable)
                .expectSubscription() //expectNoEvent counts the subscription as an event
                .expectNoEvent(Duration.ofSeconds(3))
                .then(connectable::connect)
                .expectNext("Test1", "Test2")
                .expectComplete()
                .verify();
}

首先注意expectSubscription。这避免了 expectNoEvent 爆炸,因为它认为订阅行为是一个事件(并且仍然有对 ConnectableFlux 本身的订阅 - 它只是阻止订阅它自己的上游,直到你调用 connect()).