Reactor - Flux 需要非过滤器订阅者

Reactor - Flux requires non filter subscriber

我正在使用 Reactor 并正在创建一个 flux,并向其发布一些事件。 我的问题是我使用过滤器创建的订阅者会在一段时间后失败,除非我在 flux 上添加一个非过滤器订阅者。

import reactor.core.publisher.EmitterProcessor

class PublishSubscribe {

companion object {
    @JvmStatic
    fun main(args: Array<String>) {
        val publisher = EmitterProcessor.create<String>().connect()
        writeAndGet(publisher)
        writeAndGet(publisher)
        writeAndGet(publisher)

    }

    fun writeAndGet(publisher: EmitterProcessor<String>) {
        val result = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()

        val result2 = publisher
                .filter { true }
                .takeUntil { it == "end" }
                .collectList()
                .subscribe()


        Thread.sleep(1000)

        publisher.onNext("unu")
        publisher.onNext("end")

        try {

            println("X=" + result.blockMillis(3000))
            println("Y=" + result2.blockMillis(3000))

        } catch (e: Exception) {
            e.printStackTrace()
        }
        println(result.isTerminated)
        println(result2.isTerminated)
        println("---")

    }

}

}

如果我有一个额外的订阅者,代码可以正常工作。

...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe()  //this solves the issue
writeAndGet(publisher)
...

关于我做错了什么的想法?

此致

EmitterProcessor.create() 创建处理器并将 autoCancel 标志设置为 true。这意味着一旦所有订阅者取消订阅,它就会自动取消。

让您的订阅者退订的不是 filter,而是 takeUntil 运营商。

添加一个额外的 永久 订阅者可以防止处理器自动取消,但这似乎不是一个好的解决方案。

为了使您的测试用例正常工作,您必须使用 EmitterProcessor.create(false) 创建处理器。这会将 autoCancel 设置为 false,因此您可以一次又一次地重新订阅。