如何确保具有无限流的 Flux 在给定时间内完成?
How to ensure that a Flux with infinite stream completes in a given time?
我有一个 Flux
生成器(来自项目反应器)用于延迟 1 秒发射每个元素的流。因为流是无限的,所以我还决定使用 take()
方法要获取多少个元素。我想测试这样的方法是否可以在给定时间内执行。我尝试创建我的解决方案 但没有成功。
这是Flux
方法
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec))
.map(l -> l.doubleValue())
.take(numberOfElements)
.log();
}
}
这是单元测试。在 .thenAwait(Duration.ofSeconds(4))
上等待 4 秒后,我尝试了使用 .thenConsumeWhile(value -> true)
、.expectNextCount(4)
、.expectNext(0.0, 1.0, 2.0, 3.0)
的方法。但是 none 有效。
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
@Test
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
// .expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
}
我收到此错误消息,表明我没有使用值 1.0
、2.0
,...基本上我想确保我只使用值 0.0, 1.0, 2.0, 3.0
4 秒内。我该怎么做?
12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()
expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
补充说明:再次说明我想要的行为。如果我取消注释 .expectNext(0.0, 1.0, 2.0, 3.0)
并将 .thenAwait(Duration.ofSeconds(2))
中的 4 秒替换为 2 秒,测试应该会失败。但事实并非如此。
如果我取消对 expectNext 的注释,这个测试 运行 对我来说“很好”。
public Flux<Double> create(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
}
@Test
public void virtualTimeTest() {
Flux<Double> streamLongFlux = create(4,1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
它不是运行虚拟时间,因为Flux.interval在虚拟时间建立之前被调用。
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
这在虚拟时间内完成 运行,需要 400 毫秒而不是 4.4 秒
另一个更明确的测试是:
@Test
public void virtualTime(){
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(3.0)
.verifyComplete();
}
其他人帮助了我,我在这里发布解决方案。我正在使用 JUnit 5 timeout。使用 @Timeout(value = 5, unit = TimeUnit.SECONDS)
测试通过,如果我使用 4 秒或更少则测试失败。该解决方案不是基于StepVerifier
但仍然达到其目的。
@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
StepVerifier.create(streamLongFlux)
.expectSubscription()
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
}
我有一个 Flux
生成器(来自项目反应器)用于延迟 1 秒发射每个元素的流。因为流是无限的,所以我还决定使用 take()
方法要获取多少个元素。我想测试这样的方法是否可以在给定时间内执行。我尝试创建我的解决方案
这是Flux
方法
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec))
.map(l -> l.doubleValue())
.take(numberOfElements)
.log();
}
}
这是单元测试。在 .thenAwait(Duration.ofSeconds(4))
上等待 4 秒后,我尝试了使用 .thenConsumeWhile(value -> true)
、.expectNextCount(4)
、.expectNext(0.0, 1.0, 2.0, 3.0)
的方法。但是 none 有效。
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
@Test
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
// .expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
}
我收到此错误消息,表明我没有使用值 1.0
、2.0
,...基本上我想确保我只使用值 0.0, 1.0, 2.0, 3.0
4 秒内。我该怎么做?
12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()
expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
补充说明:再次说明我想要的行为。如果我取消注释 .expectNext(0.0, 1.0, 2.0, 3.0)
并将 .thenAwait(Duration.ofSeconds(2))
中的 4 秒替换为 2 秒,测试应该会失败。但事实并非如此。
如果我取消对 expectNext 的注释,这个测试 运行 对我来说“很好”。
public Flux<Double> create(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
}
@Test
public void virtualTimeTest() {
Flux<Double> streamLongFlux = create(4,1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
它不是运行虚拟时间,因为Flux.interval在虚拟时间建立之前被调用。
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
这在虚拟时间内完成 运行,需要 400 毫秒而不是 4.4 秒
另一个更明确的测试是:
@Test
public void virtualTime(){
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(3.0)
.verifyComplete();
}
其他人帮助了我,我在这里发布解决方案。我正在使用 JUnit 5 timeout。使用 @Timeout(value = 5, unit = TimeUnit.SECONDS)
测试通过,如果我使用 4 秒或更少则测试失败。该解决方案不是基于StepVerifier
但仍然达到其目的。
@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
StepVerifier.create(streamLongFlux)
.expectSubscription()
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
}