测试内部 Flux.interval

Testing internal Flux.interval

我想使用 Reactor 的 Flux 提供一些数据。由于提供此数据可能需要很多时间,因此我决定引入一种 ping 机制(例如,保持 tcp 连接处于活动状态并且不会超时)。这是我的简化解决方案:


public class Example {

    private final DataProvider dataProvider;

    public Example(DataProvider dataProvider) {
        this.dataProvider = dataProvider;
    }

    public Flux<String> getData() {
        AtomicBoolean inProgress = new AtomicBoolean(true);

        Flux<String> dataFlux = dataProvider.provide()
            .doFinally(ignoreIt -> inProgress.set(false));

        return dataFlux.mergeWith(ping(inProgress::get));
    }

    private Publisher<String> ping(Supplier<Boolean> inProgress) {
        return Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1))
                .map((tick) -> "ping " + tick)
                .takeWhile(ignoreIt -> inProgress.get());
    }

    interface DataProvider {
        Flux<String> provide();
    }

    public static void main(String[] args) {
        Callable<String> dataProviderLogic = () -> {
            Thread.sleep(3500);
            return "REAL DATA - SHOULD TERMINATE PING";
        };

        // wrapping synchronous call
        DataProvider dataProvider = () -> Mono.fromCallable(dataProviderLogic)
                .subscribeOn(Schedulers.boundedElastic())
                .flux();

        new Example(dataProvider).getData()
            .doOnNext(data -> System.out.println("GOT: " + data))
            .blockLast();
    }
}

以上代码打印在控制台上:

GOT: ping 0
GOT: ping 1
GOT: ping 2
GOT: REAL DATA - SHOULD TERMINATE PING

所以它按预期工作。

问题是:我如何在 Junit5 测试中测试此 ping 机制,所以它不会花费很多时间(例如几秒钟)?

在理想情况下,我想编写一个模拟数据提供延迟的测试,检查是否生成了预期数量的 ping 并验证是否发出了完整信号(以确保 ping 通量按预期终止).当然我希望有一个单元测试,可以运行 in ms.

我试过了,但没有成功:

    @Test
    void test() {
        TestPublisher<String> publisher = TestPublisher.create();

        Flux<String> data = new Example(publisher::flux).getData();

        StepVerifier.withVirtualTime(() -> data)
                .thenAwait(Duration.ofMillis(3500))
                .then(() -> publisher.emit("REAL DATA - SHOULD TERMINATE PING"))
                .then(publisher::complete)
                .expectNextCount(4)
                .verifyComplete();
    }

以上测试以这个错误结束:

java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onComplete())

是否可以将虚拟时间用于内部创建的Flux.interval?

如有任何替代 ping 解决方案的想法,我们将不胜感激。

尽管上述 ping 机制不是最好的(我建议使用 Sink 而不是 AtomicBoolean 并使用 takeUntilOther 而不是 takeWhile) ,在我的例子中,问题可能与并非所有通量指令都用 withVirtualTime 包裹的情况有关。此代码在上述情况下按预期工作:

    @Test
    void test() {
        StepVerifier.withVirtualTime(() -> {
            Flux<String> data = Flux.just("REAL DATA - SHOULD TERMINATE PING").delayElements(Duration.ofMillis(3200));
            return new Example(() -> data).getData();
        })
                .thenAwait(Duration.ofMillis(3500))
                .expectNextCount(4)
                .thenAwait(Duration.ofMillis(1000))
                .verifyComplete();
    }