Reactor - 使用 Flux.publish() 会阻止 StepVerifier.thenCancel() 工作?
Reactor - using Flux.publish() prevents StepVerifier.thenCancel() from working?
如果我直接将 flux 与 StepVerifier
一起使用,一切正常,但如果我在 flux 上调用 publish().autoConnect()
并将结果与 StepVerifier
一起使用,那么 verify()
步骤永远不会完成。
例如我有一个像这样的简单通量,它生成从 0 到无穷大的整数:
Flux<Integer> flux = Flux.fromStream(Stream.iterate(0, i -> i + 1));
然后 StepVerifier
一切正常,如果我只消耗一些元素,取消并验证:
StepVerifier.create(flux)
.thenConsumeWhile(i -> i < 10)
.thenCancel()
.verify();
但如果相反,我执行以下操作,那么它永远不会完成:
StepVerifier.create(flux.publish().autoConnect())
.thenConsumeWhile(i -> i < 10)
.thenCancel()
.verify();
我猜这里发生了一些相当明显的事情,但我不知道是什么?
这突然出现,因为我试图在测试中使用 StepVerifier
,API 看起来像这样:
class MyApi {
private Flux<Integer> underlyingFlux = ...
Flux<Integer> getFlux() { return underlyingFlux.publish().autoConnect(); }
}
查看我的 github examples 一些现成的 运行 代码,这些代码演示了我在 StepVerifier
.
中遇到的问题
更新: 此问题已被 Reactor 团队接受为错误 - 请参阅问题 #1528 - 版本 3.2 中提供了修复程序。9.RELEASE 反应堆核心。
Reactor 团队已将此问题作为错误接受 - 请参阅问题 #1528 - 并且在 reactor-core 的 3.2.9.RELEASE 版中提供了修复程序。
如果我直接将 flux 与 StepVerifier
一起使用,一切正常,但如果我在 flux 上调用 publish().autoConnect()
并将结果与 StepVerifier
一起使用,那么 verify()
步骤永远不会完成。
例如我有一个像这样的简单通量,它生成从 0 到无穷大的整数:
Flux<Integer> flux = Flux.fromStream(Stream.iterate(0, i -> i + 1));
然后 StepVerifier
一切正常,如果我只消耗一些元素,取消并验证:
StepVerifier.create(flux)
.thenConsumeWhile(i -> i < 10)
.thenCancel()
.verify();
但如果相反,我执行以下操作,那么它永远不会完成:
StepVerifier.create(flux.publish().autoConnect())
.thenConsumeWhile(i -> i < 10)
.thenCancel()
.verify();
我猜这里发生了一些相当明显的事情,但我不知道是什么?
这突然出现,因为我试图在测试中使用 StepVerifier
,API 看起来像这样:
class MyApi {
private Flux<Integer> underlyingFlux = ...
Flux<Integer> getFlux() { return underlyingFlux.publish().autoConnect(); }
}
查看我的 github examples 一些现成的 运行 代码,这些代码演示了我在 StepVerifier
.
更新: 此问题已被 Reactor 团队接受为错误 - 请参阅问题 #1528 - 版本 3.2 中提供了修复程序。9.RELEASE 反应堆核心。
Reactor 团队已将此问题作为错误接受 - 请参阅问题 #1528 - 并且在 reactor-core 的 3.2.9.RELEASE 版中提供了修复程序。