测试内部 Flux.interval
Testing internal Flux.interval
我想使用 Reactor 的 Flux 提供一些数据。由于提供此数据可能需要很多时间,因此我决定引入一种 ping 机制(例如,保持 tcp 连接处于活动状态并且不会超时)。这是我的简化解决方案:
public class Example {
private final DataProvider dataProvider;
public Example(DataProvider dataProvider) {
this.dataProvider = dataProvider;
}
public Flux<String> getData() {
AtomicBoolean inProgress = new AtomicBoolean(true);
Flux<String> dataFlux = dataProvider.provide()
.doFinally(ignoreIt -> inProgress.set(false));
return dataFlux.mergeWith(ping(inProgress::get));
}
private Publisher<String> ping(Supplier<Boolean> inProgress) {
return Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1))
.map((tick) -> "ping " + tick)
.takeWhile(ignoreIt -> inProgress.get());
}
interface DataProvider {
Flux<String> provide();
}
public static void main(String[] args) {
Callable<String> dataProviderLogic = () -> {
Thread.sleep(3500);
return "REAL DATA - SHOULD TERMINATE PING";
};
// wrapping synchronous call
DataProvider dataProvider = () -> Mono.fromCallable(dataProviderLogic)
.subscribeOn(Schedulers.boundedElastic())
.flux();
new Example(dataProvider).getData()
.doOnNext(data -> System.out.println("GOT: " + data))
.blockLast();
}
}
以上代码打印在控制台上:
GOT: ping 0
GOT: ping 1
GOT: ping 2
GOT: REAL DATA - SHOULD TERMINATE PING
所以它按预期工作。
问题是:我如何在 Junit5 测试中测试此 ping 机制,所以它不会花费很多时间(例如几秒钟)?
在理想情况下,我想编写一个模拟数据提供延迟的测试,检查是否生成了预期数量的 ping 并验证是否发出了完整信号(以确保 ping 通量按预期终止).当然我希望有一个单元测试,可以运行 in ms.
我试过了,但没有成功:
@Test
void test() {
TestPublisher<String> publisher = TestPublisher.create();
Flux<String> data = new Example(publisher::flux).getData();
StepVerifier.withVirtualTime(() -> data)
.thenAwait(Duration.ofMillis(3500))
.then(() -> publisher.emit("REAL DATA - SHOULD TERMINATE PING"))
.then(publisher::complete)
.expectNextCount(4)
.verifyComplete();
}
以上测试以这个错误结束:
java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onComplete())
是否可以将虚拟时间用于内部创建的Flux.interval?
如有任何替代 ping 解决方案的想法,我们将不胜感激。
尽管上述 ping 机制不是最好的(我建议使用 Sink
而不是 AtomicBoolean
并使用 takeUntilOther
而不是 takeWhile
) ,在我的例子中,问题可能与并非所有通量指令都用 withVirtualTime
包裹的情况有关。此代码在上述情况下按预期工作:
@Test
void test() {
StepVerifier.withVirtualTime(() -> {
Flux<String> data = Flux.just("REAL DATA - SHOULD TERMINATE PING").delayElements(Duration.ofMillis(3200));
return new Example(() -> data).getData();
})
.thenAwait(Duration.ofMillis(3500))
.expectNextCount(4)
.thenAwait(Duration.ofMillis(1000))
.verifyComplete();
}
我想使用 Reactor 的 Flux 提供一些数据。由于提供此数据可能需要很多时间,因此我决定引入一种 ping 机制(例如,保持 tcp 连接处于活动状态并且不会超时)。这是我的简化解决方案:
public class Example {
private final DataProvider dataProvider;
public Example(DataProvider dataProvider) {
this.dataProvider = dataProvider;
}
public Flux<String> getData() {
AtomicBoolean inProgress = new AtomicBoolean(true);
Flux<String> dataFlux = dataProvider.provide()
.doFinally(ignoreIt -> inProgress.set(false));
return dataFlux.mergeWith(ping(inProgress::get));
}
private Publisher<String> ping(Supplier<Boolean> inProgress) {
return Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1))
.map((tick) -> "ping " + tick)
.takeWhile(ignoreIt -> inProgress.get());
}
interface DataProvider {
Flux<String> provide();
}
public static void main(String[] args) {
Callable<String> dataProviderLogic = () -> {
Thread.sleep(3500);
return "REAL DATA - SHOULD TERMINATE PING";
};
// wrapping synchronous call
DataProvider dataProvider = () -> Mono.fromCallable(dataProviderLogic)
.subscribeOn(Schedulers.boundedElastic())
.flux();
new Example(dataProvider).getData()
.doOnNext(data -> System.out.println("GOT: " + data))
.blockLast();
}
}
以上代码打印在控制台上:
GOT: ping 0
GOT: ping 1
GOT: ping 2
GOT: REAL DATA - SHOULD TERMINATE PING
所以它按预期工作。
问题是:我如何在 Junit5 测试中测试此 ping 机制,所以它不会花费很多时间(例如几秒钟)?
在理想情况下,我想编写一个模拟数据提供延迟的测试,检查是否生成了预期数量的 ping 并验证是否发出了完整信号(以确保 ping 通量按预期终止).当然我希望有一个单元测试,可以运行 in ms.
我试过了,但没有成功:
@Test
void test() {
TestPublisher<String> publisher = TestPublisher.create();
Flux<String> data = new Example(publisher::flux).getData();
StepVerifier.withVirtualTime(() -> data)
.thenAwait(Duration.ofMillis(3500))
.then(() -> publisher.emit("REAL DATA - SHOULD TERMINATE PING"))
.then(publisher::complete)
.expectNextCount(4)
.verifyComplete();
}
以上测试以这个错误结束:
java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onComplete())
是否可以将虚拟时间用于内部创建的Flux.interval?
如有任何替代 ping 解决方案的想法,我们将不胜感激。
尽管上述 ping 机制不是最好的(我建议使用 Sink
而不是 AtomicBoolean
并使用 takeUntilOther
而不是 takeWhile
) ,在我的例子中,问题可能与并非所有通量指令都用 withVirtualTime
包裹的情况有关。此代码在上述情况下按预期工作:
@Test
void test() {
StepVerifier.withVirtualTime(() -> {
Flux<String> data = Flux.just("REAL DATA - SHOULD TERMINATE PING").delayElements(Duration.ofMillis(3200));
return new Example(() -> data).getData();
})
.thenAwait(Duration.ofMillis(3500))
.expectNextCount(4)
.thenAwait(Duration.ofMillis(1000))
.verifyComplete();
}