为更新的布尔值添加流

Add Flow for Boolean That Gets Updated

我有一个听众:

class Owner { 
    private lateinit var flow: Flow<Boolean>

    init {
        flow = callbackFlow {
        val callback = object : Listener {

            override fun onExit() {
                offer(false)
            }

            override fun onEnter() {
                offer(true)
            }
        }
        MyService.register(callback)
        awaitClose {
            //service.unregister(callback)
        }
    }

    fun getFlow(): Flow<Boolean> = flow 

}

所以我希望每次调用回调时,流程都会更新,任何其他 class 观察流程的人自然会收到通知。如何做到这一点?

如果我想在没有回调的情况下也将值设置为 truefalse 可以吗?

这里有一些独立的代码,可以证明您的基本设计可以按预期工作。它注册了两个侦听器,启动MyService,让侦听器观察一些事件,取消一个侦听器,让另一个侦听器继续运行一段时间,然后取消一切并干净地完成。

因此,您提出的问题实际上并没有指向任何需要解决的问题。

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.concurrent.thread

fun main() = runBlocking<Unit> {
    val owner = Owner()
    val consumer1 = launch {
        owner.flow.collect {
            println("listener 1: $it")
        }
    }
    val consumer2 = launch {
        owner.flow.collect {
            println("listener 2: $it")
        }
    }
    delay(100)
    MyService.start()
    delay(600)
    println("Cancelling consumer 2")
    consumer2.cancel()
    delay(600)
    MyService.stop()
    consumer1.cancel()
}

class Owner {
    val flow = callbackFlow() {
        val callback = object : Listener {
            override fun onEnter() {
                offer(true)
            }
            override fun onExit() {
                offer(false)
            }
        }

        MyService.register(callback)
        awaitClose {
            MyService.unregister(callback)
        }
    }
}

object MyService {
    private val listeners = LinkedHashSet<Listener>()
    @Volatile
    private var keepGoing = true

    fun register(listener: Listener) {
        listeners.add(listener)
    }

    fun unregister(listener: Listener) {
        listeners.remove(listener)
    }

    fun start() = thread {
        while (keepGoing) {
            listeners.forEach {
                it.onEnter()
            }
            Thread.sleep(100)
            listeners.forEach {
                it.onExit()
            }
            Thread.sleep(200)
        }
    }

    fun stop() {
        keepGoing = false
    }
}

interface Listener {
    fun onExit()
    fun onEnter()
}

当我 运行 它时,打印出来:

listener 1: true
listener 2: true
listener 1: false
listener 2: false
listener 1: true
listener 2: true
listener 1: false
listener 2: false
Cancelling consumer 2
listener 1: true
listener 1: false
listener 1: true
listener 1: false

Process finished with exit code 0