Reactor - Flux 需要非过滤器订阅者
Reactor - Flux requires non filter subscriber
我正在使用 Reactor 并正在创建一个 flux,并向其发布一些事件。
我的问题是我使用过滤器创建的订阅者会在一段时间后失败,除非我在 flux 上添加一个非过滤器订阅者。
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
如果我有一个额外的订阅者,代码可以正常工作。
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
关于我做错了什么的想法?
此致
EmitterProcessor.create()
创建处理器并将 autoCancel
标志设置为 true
。这意味着一旦所有订阅者取消订阅,它就会自动取消。
让您的订阅者退订的不是 filter
,而是 takeUntil
运营商。
添加一个额外的 永久 订阅者可以防止处理器自动取消,但这似乎不是一个好的解决方案。
为了使您的测试用例正常工作,您必须使用 EmitterProcessor.create(false)
创建处理器。这会将 autoCancel
设置为 false
,因此您可以一次又一次地重新订阅。
我正在使用 Reactor 并正在创建一个 flux,并向其发布一些事件。 我的问题是我使用过滤器创建的订阅者会在一段时间后失败,除非我在 flux 上添加一个非过滤器订阅者。
import reactor.core.publisher.EmitterProcessor
class PublishSubscribe {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val publisher = EmitterProcessor.create<String>().connect()
writeAndGet(publisher)
writeAndGet(publisher)
writeAndGet(publisher)
}
fun writeAndGet(publisher: EmitterProcessor<String>) {
val result = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
val result2 = publisher
.filter { true }
.takeUntil { it == "end" }
.collectList()
.subscribe()
Thread.sleep(1000)
publisher.onNext("unu")
publisher.onNext("end")
try {
println("X=" + result.blockMillis(3000))
println("Y=" + result2.blockMillis(3000))
} catch (e: Exception) {
e.printStackTrace()
}
println(result.isTerminated)
println(result2.isTerminated)
println("---")
}
}
}
如果我有一个额外的订阅者,代码可以正常工作。
...
val publisher = EmitterProcessor.create<String>().connect()
publisher.subscribe() //this solves the issue
writeAndGet(publisher)
...
关于我做错了什么的想法?
此致
EmitterProcessor.create()
创建处理器并将 autoCancel
标志设置为 true
。这意味着一旦所有订阅者取消订阅,它就会自动取消。
让您的订阅者退订的不是 filter
,而是 takeUntil
运营商。
添加一个额外的 永久 订阅者可以防止处理器自动取消,但这似乎不是一个好的解决方案。
为了使您的测试用例正常工作,您必须使用 EmitterProcessor.create(false)
创建处理器。这会将 autoCancel
设置为 false
,因此您可以一次又一次地重新订阅。