Reactor - 发布行为

Reactor - Publish behavior

我对 Flux 中的发布行为感到很困惑。为什么第二个订阅者不打印任何东西,而第一个订阅者打印。它是一个热门发布者,值每秒发出一次。两者应该共享相同的元素。

    Flux<String> flux = Flux.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f"))
                                    .publish()
                                    .autoConnect()
                                    .delayElements(Duration.ofSeconds(1));

    flux.subscribe(s -> System.out.println("1 - " + s));
    flux.subscribe(s -> System.out.println("2 - " + s));

有趣的是,share 方法显示了两个订阅者的输出。

事实上它是一个 "hot publisher" 意味着如果它们在您订阅之前发出,您可能会错过值,这就是这里发生的事情 - 第一个订阅者导致 Flux 开始发布,并在您的第二个订阅者之前发布其所有值。

您可能希望 delayElements() 改变这种行为,因为它会将每个延迟一秒,这应该足以让第二个订阅者订阅。但是,这不会发生,因为您只是在 之后 publish()autoconnect() 调用延迟元素 - 或者换句话说,只有 之后 Flux 已经完成 "hot"。这意味着所有值都几乎是立即发出,在被延迟之前,在第二个订阅者有机会订阅之前。

相反,我怀疑您想要 delayElements() 在 之前调用 它是一个热通量,如下所示:

Flux<String> flux = Flux.fromIterable(Arrays.asList("a", "b", "c", "d", "e", "f"))
        .delayElements(Duration.ofSeconds(1))
        .publish()
        .autoConnect();

这将在 Flux 变热之前延迟元素,几乎肯定会给您的第二个订阅者足够的时间来订阅,并按您的预期打印两组结果。