如何为 Kotlin Coroutines Flow 编写扩展函数/包装器?
How to write an extension function / wrapper for Kotlin Coroutines Flow?
我有使用 callbackFlow
的协程代码,如下所示:
fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
RxEventBus().register(
receiverId,
FirstUniqueEvent::class.java,
false
) { amEvent ->
offer(amEvent)
}
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(Dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想做的是将以下内容包装起来,以便可以自动调用它,因为我在代码中有很多类似的功能:
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(Dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想包装 awaitClose & flowOn 一次,而不必为每个 callbackFlow 都写它。您知道我可以使用哪种 Kotlin 高阶构造来实现此目的吗?
谢谢,
伊戈尔
这是包装 awaitClose
和 handleErrors
的解决方案:
/**
* Utility function which suspends a coroutine until it is completed or closed.
* Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
*/
suspend fun finalizeFlow(scope: ProducerScope<Any>, receiverId: String) {
scope.awaitClose {
unsubscribeFromEventBus(receiverId)
scope.cancel()
}
scope.invokeOnClose {
Logger.debug(
javaClass.canonicalName,
"Closed Flow channel for receiverId $receiverId"
)
}
}
/**
* Extension function which does error handling on [Flow].
*/
fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
reportError(throwable as Exception)
}
我有使用 callbackFlow
的协程代码,如下所示:
fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
RxEventBus().register(
receiverId,
FirstUniqueEvent::class.java,
false
) { amEvent ->
offer(amEvent)
}
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(Dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想做的是将以下内容包装起来,以便可以自动调用它,因为我在代码中有很多类似的功能:
// Suspend until either onCompleted or external cancellation are invoked
awaitClose {
unsubscribeFromEventBus(receiverId)
cancel()
}
}.flowOn(Dispatchers.Default)
.catch { throwable ->
reportError(throwable as Exception)
}
我想包装 awaitClose & flowOn 一次,而不必为每个 callbackFlow 都写它。您知道我可以使用哪种 Kotlin 高阶构造来实现此目的吗?
谢谢, 伊戈尔
这是包装 awaitClose
和 handleErrors
的解决方案:
/**
* Utility function which suspends a coroutine until it is completed or closed.
* Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
*/
suspend fun finalizeFlow(scope: ProducerScope<Any>, receiverId: String) {
scope.awaitClose {
unsubscribeFromEventBus(receiverId)
scope.cancel()
}
scope.invokeOnClose {
Logger.debug(
javaClass.canonicalName,
"Closed Flow channel for receiverId $receiverId"
)
}
}
/**
* Extension function which does error handling on [Flow].
*/
fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
reportError(throwable as Exception)
}