如何为 Kotlin Coroutines Flow 编写扩展函数/包装器?

How to write an extension function / wrapper for Kotlin Coroutines Flow?

我有使用 callbackFlow 的协程代码,如下所示:

fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
    RxEventBus().register(
        receiverId,
        FirstUniqueEvent::class.java,
        false
    ) { amEvent ->
        offer(amEvent)
    }
    // Suspend until either onCompleted or external cancellation are invoked
    awaitClose {
        unsubscribeFromEventBus(receiverId)
        cancel()
    }
}.flowOn(Dispatchers.Default)
    .catch { throwable ->
        reportError(throwable as Exception)
    }

我想做的是将以下内容包装起来,以便可以自动调用它,因为我在代码中有很多类似的功能:

        // Suspend until either onCompleted or external cancellation are invoked
        awaitClose {
            unsubscribeFromEventBus(receiverId)
            cancel()
        }
    }.flowOn(Dispatchers.Default)
        .catch { throwable ->
            reportError(throwable as Exception)
        }

我想包装 awaitClose & flowOn 一次,而不必为每个 callbackFlow 都写它。您知道我可以使用哪种 Kotlin 高阶构造来实现此目的吗?

谢谢, 伊戈尔

这是包装 awaitClosehandleErrors 的解决方案:

/**
 * Utility function which suspends a coroutine until it is completed or closed.
 * Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
 */
suspend fun finalizeFlow(scope: ProducerScope<Any>, receiverId: String) {
    scope.awaitClose {
        unsubscribeFromEventBus(receiverId)
        scope.cancel()
    }

    scope.invokeOnClose {
        Logger.debug(
            javaClass.canonicalName,
            "Closed Flow channel for receiverId $receiverId"
        )
    }
}

 /**
  * Extension function which does error handling on [Flow].
  */
 fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
     reportError(throwable as Exception)
 }