测试 RxJava BehaviorProcessor 发出一个值

Test that RxJava BehaviorProcessor emits a value

我无法理解为什么所有这些处理器都通过了测试,但 Behavior 却没有:

package com.example;

import org.junit.Test;

import io.reactivex.Flowable;
import io.reactivex.processors.*;

public class ProcessorTest {
    private static Flowable<String> justFoo() {
        return Flowable.just("foo");
    }

    private static FlowableProcessor<String> subscribeToFoo(
            FlowableProcessor<String> processor) {
        justFoo().subscribe(processor);
        return processor;
    }

    @Test public void flowable() {  // pass
        justFoo().test().assertValue("foo");
    }

    @Test public void async() {  // pass
        subscribeToFoo(AsyncProcessor.create()).test().assertValue("foo");
    }

    @Test public void replay() {  // pass
        subscribeToFoo(ReplayProcessor.create()).test().assertValue("foo");
    }

    @Test public void unicast() {  // pass
        subscribeToFoo(UnicastProcessor.create()).test().assertValue("foo");
    }

    @Test public void behaviorFail() {  // fail
        subscribeToFoo(BehaviorProcessor.create()).test().assertValue("foo");
    }

    @Test public void behaviorPassing() {  // pass
        subscribeToFoo(BehaviorProcessor.create())
                .test()
                .assertNoValues()
                .assertSubscribed()
                .assertComplete()
                .assertNoErrors()
                .assertNoTimeout()
                .assertTerminated();
    }
}

文档说 BehaviorProcessor 是:

Processor that emits the most recent item it has observed and all subsequent observed items to each subscribed Subscriber.

所以在我看来它应该通过 behaviorFail 测试,而不是 behaviorPassing。怎么会这样?

我将如何编写有效的测试,以了解 BehaviorProcessor 发出了特定值?

摆脱传递给处理器的终端事件会有所帮助:

@Test public void behavior() {
    final BehaviorProcessor<String> processor = BehaviorProcessor.create();
    justFoo().concatWith(Flowable.never()).subscribe(processor);
    processor.test().assertValue("foo");
}