Kotlin SharedFlow - 如何订阅?
Kotlin SharedFlow - how to subscribe?
我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅活跃 5 分钟,它应该只在 window 内接收消息。 Kotlin-consumer应该可以随时订阅,随时获取收到的消息。
通过阅读文档,我认为 Kotlin 的 SharedFlow
是最好的方法...
"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)
但我找不到任何好的例子,文档也很混乱。 SharedFlow
文档说“所有收集器获取所有发出的值”和“共享流的活动收集器称为订阅者”,但它没有解释如何实际创建订阅者。
选项:
shareIn
说它将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。
Flow.collect
在文档中有链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用."
launchIn
被描述为终端 - 但我不想结束消费
amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
Flow.collect
和 launchIn
都“从未正常完成”——但我确实希望能够正常完成它们。
这是我尝试订阅消息的方式,但我始终无法获得任何结果。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}
suspend fun produceMessages() = coroutineScope {
val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}
println("waiting 3")
delay(Duration.seconds(3))
launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}
launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}
launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}
launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)
repeat(1000) {
delay(Duration.seconds(1))
println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}
更新
我有一个工作 solution.Thanks 去 Tenfour04 寻求答案,这帮助我理解了。
这是一个接近我需要的示例。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}
/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {
val listenerScope = CoroutineScope(SupervisorJob())
val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)
capture.asSharedFlow()
}
/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}
}
fun main() {
val publisher = Publisher()
// both Fish and Llama can subscribe at any point, and get all subsequent values
runBlocking {
delay(Duration.seconds(2))
launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(2))
launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(10))
}
}
Flow.collect
有一个标记为内部的重载,但是有一个非常常用的 public collect
扩展函数。我建议将这个包罗万象的导入放在文件的顶部,然后扩展功能将在其他 Flow 相关任务中可用:import kotlinx.coroutines.flow.*
launchIn
和 collect
是订阅流的两种最常见的方式。他们都是终端。 “终端”并不意味着它 结束 消耗……它意味着它 开始 消耗! “非终结”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。
“Never complete normally”表示协程中它后面的代码将不会到达。 collect
订阅一个流并挂起协程直到流完成。由于 SharedFlow 永远不会完成,它“永远不会正常完成”。
很难对您的代码发表评论,因为启动您的流程并将其收集在同一个函数中是不寻常的。通常,SharedFlow 会公开为 属性 以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了一个事实,即通常 SharedFlow 可能从不同的协程范围发布而不是从中收集。
这是一个部分改编自您的代码的示例:
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
// Delay a while. We'll miss the first couple messages.
delay(1300)
// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}
delay(3000) // Keep app alive for a while
}
}
由于 collect
通常会阻止协程中 运行 它下面的任何代码,因此 launchIn
函数可以使正在发生的事情更明显,更简洁:
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
delay(1300)
publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)
delay(3000)
}
}
我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅活跃 5 分钟,它应该只在 window 内接收消息。 Kotlin-consumer应该可以随时订阅,随时获取收到的消息。
通过阅读文档,我认为 Kotlin 的 SharedFlow
是最好的方法...
"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)
但我找不到任何好的例子,文档也很混乱。 SharedFlow
文档说“所有收集器获取所有发出的值”和“共享流的活动收集器称为订阅者”,但它没有解释如何实际创建订阅者。
选项:
shareIn
说它将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。Flow.collect
在文档中有链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用."launchIn
被描述为终端 - 但我不想结束消费
amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
Flow.collect
和launchIn
都“从未正常完成”——但我确实希望能够正常完成它们。
这是我尝试订阅消息的方式,但我始终无法获得任何结果。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}
suspend fun produceMessages() = coroutineScope {
val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}
println("waiting 3")
delay(Duration.seconds(3))
launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}
launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}
launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}
launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)
repeat(1000) {
delay(Duration.seconds(1))
println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}
更新
我有一个工作 solution.Thanks 去 Tenfour04 寻求答案,这帮助我理解了。
这是一个接近我需要的示例。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}
/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {
val listenerScope = CoroutineScope(SupervisorJob())
val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)
capture.asSharedFlow()
}
/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}
}
fun main() {
val publisher = Publisher()
// both Fish and Llama can subscribe at any point, and get all subsequent values
runBlocking {
delay(Duration.seconds(2))
launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(2))
launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(10))
}
}
Flow.collect
有一个标记为内部的重载,但是有一个非常常用的 public collect
扩展函数。我建议将这个包罗万象的导入放在文件的顶部,然后扩展功能将在其他 Flow 相关任务中可用:import kotlinx.coroutines.flow.*
launchIn
和 collect
是订阅流的两种最常见的方式。他们都是终端。 “终端”并不意味着它 结束 消耗……它意味着它 开始 消耗! “非终结”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。
“Never complete normally”表示协程中它后面的代码将不会到达。 collect
订阅一个流并挂起协程直到流完成。由于 SharedFlow 永远不会完成,它“永远不会正常完成”。
很难对您的代码发表评论,因为启动您的流程并将其收集在同一个函数中是不寻常的。通常,SharedFlow 会公开为 属性 以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了一个事实,即通常 SharedFlow 可能从不同的协程范围发布而不是从中收集。
这是一个部分改编自您的代码的示例:
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
// Delay a while. We'll miss the first couple messages.
delay(1300)
// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}
delay(3000) // Keep app alive for a while
}
}
由于 collect
通常会阻止协程中 运行 它下面的任何代码,因此 launchIn
函数可以使正在发生的事情更明显,更简洁:
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
delay(1300)
publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)
delay(3000)
}
}