如何确保具有无限流的 Flux 在给定时间内完成?

How to ensure that a Flux with infinite stream completes in a given time?

我有一个 Flux 生成器(来自项目反应器)用于延迟 1 秒发射每个元素的流。因为流是无限的,所以我还决定使用 take() 方法要获取多少个元素。我想测试这样的方法是否可以在给定时间内执行。我尝试创建我的解决方案 但没有成功。

这是Flux方法

import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
    public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec))
                .map(l -> l.doubleValue())
                .take(numberOfElements)
                .log();
    }
}

这是单元测试。在 .thenAwait(Duration.ofSeconds(4)) 上等待 4 秒后,我尝试了使用 .thenConsumeWhile(value -> true).expectNextCount(4).expectNext(0.0, 1.0, 2.0, 3.0) 的方法。但是 none 有效。

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
    FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
    @Test
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                // .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }
}

我收到此错误消息,表明我没有使用值 1.02.0,...基本上我想确保我只使用值 0.0, 1.0, 2.0, 3.0 4 秒内。我该怎么做?

12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()

expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))

补充说明:再次说明我想要的行为。如果我取消注释 .expectNext(0.0, 1.0, 2.0, 3.0) 并将 .thenAwait(Duration.ofSeconds(2)) 中的 4 秒替换为 2 秒,测试应该会失败。但事实并非如此。

如果我取消对 expectNext 的注释,这个测试 运行 对我来说“很好”。

    public Flux<Double> create(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
    }

    @Test
    public void virtualTimeTest() {
        Flux<Double> streamLongFlux = create(4,1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                 .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }

它不是运行虚拟时间,因为Flux.interval在虚拟时间建立之前被调用。

 StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .thenAwait(Duration.ofSeconds(4))
                    .expectNext(0.0, 1.0, 2.0, 3.0)
                    .verifyComplete();

这在虚拟时间内完成 运行,需要 400 毫秒而不是 4.4 秒

另一个更明确的测试是:

@Test
    public void virtualTime(){
        StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(0.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(1.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(2.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(3.0)
                    .verifyComplete();
    }

其他人帮助了我,我在这里发布解决方案。我正在使用 JUnit 5 timeout。使用 @Timeout(value = 5, unit = TimeUnit.SECONDS) 测试通过,如果我使用 4 秒或更少则测试失败。该解决方案不是基于StepVerifier但仍然达到其目的。

    @Test
    @Timeout(value = 5, unit = TimeUnit.SECONDS)
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        StepVerifier.create(streamLongFlux)
                .expectSubscription()
                .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete();
    }