带 Kotlin 协程的 NetworkBoundResource

NetworkBoundResource with Kotlin coroutines

您对如何使用 NetworkBoundResource 和 Kotlin 协程实现存储库模式有任何想法吗?我知道我们可以使用 GlobalScope 启动协程,但这可能会导致协程泄漏。我想将 viewModelScope 作为参数传递,但在实现时有点棘手(因为我的存储库不知道任何 ViewModel 的 CoroutineScope)。

abstract class NetworkBoundResource<ResultType, RequestType>
@MainThread constructor(
    private val coroutineScope: CoroutineScope
) {

    private val result = MediatorLiveData<Resource<ResultType>>()

    init {
        result.value = Resource.loading(null)
        @Suppress("LeakingThis")
        val dbSource = loadFromDb()
        result.addSource(dbSource) { data ->
            result.removeSource(dbSource)
            if (shouldFetch(data)) {
                fetchFromNetwork(dbSource)
            } else {
                result.addSource(dbSource) { newData ->
                    setValue(Resource.success(newData))
                }
            }
        }
    }

    @MainThread
    private fun setValue(newValue: Resource<ResultType>) {
        if (result.value != newValue) {
            result.value = newValue
        }
    }

    private fun fetchFromNetwork(dbSource: LiveData<ResultType>) {
        val apiResponse = createCall()
        result.addSource(dbSource) { newData ->
            setValue(Resource.loading(newData))
        }
        result.addSource(apiResponse) { response ->
            result.removeSource(apiResponse)
            result.removeSource(dbSource)
            when (response) {
                is ApiSuccessResponse -> {
                    coroutineScope.launch(Dispatchers.IO) {
                        saveCallResult(processResponse(response))

                        withContext(Dispatchers.Main) {
                            result.addSource(loadFromDb()) { newData ->
                                setValue(Resource.success(newData))
                            }
                        }
                    }
                }

                is ApiEmptyResponse -> {
                    coroutineScope.launch(Dispatchers.Main) {
                        result.addSource(loadFromDb()) { newData ->
                            setValue(Resource.success(newData))
                        }
                    }
                }

                is ApiErrorResponse -> {
                    onFetchFailed()
                    result.addSource(dbSource) { newData ->
                        setValue(Resource.error(response.errorMessage, newData))
                    }
                }
            }
        }
    }
}

更新(2020-05-27):

一种比我之前的示例更符合 Kotlin 语言习惯的方式,使用 Flow API,并借鉴了 Juan 的回答,可以表示为独立函数,如下所示:

inline fun <ResultType, RequestType> networkBoundResource(
    crossinline query: () -> Flow<ResultType>,
    crossinline fetch: suspend () -> RequestType,
    crossinline saveFetchResult: suspend (RequestType) -> Unit,
    crossinline onFetchFailed: (Throwable) -> Unit = { Unit },
    crossinline shouldFetch: (ResultType) -> Boolean = { true }
) = flow<Resource<ResultType>> {
    emit(Resource.Loading(null))
    val data = query().first()

    val flow = if (shouldFetch(data)) {
        emit(Resource.Loading(data))

        try {
            saveFetchResult(fetch())
            query().map { Resource.Success(it) }
        } catch (throwable: Throwable) {
            onFetchFailed(throwable)
            query().map { Resource.Error(throwable, it) }
        }
    } else {
        query().map { Resource.Success(it) }
    }

    emitAll(flow)
}

以上代码可以从 class 中调用,例如一个存储库,像这样:

fun getItems(request: MyRequest): Flow<Resource<List<MyItem>>> {
    return networkBoundResource(
        query = { dao.queryAll() },
        fetch = { retrofitService.getItems(request) },
        saveFetchResult = { items -> dao.insert(items) }
    )
}

原回答:

这就是我使用 livedata-ktx 神器所做的;无需传入任何 CoroutineScope。 class 也只使用一种类型而不是两种(例如 ResultType/RequestType),因为我总是在别处使用适配器来映射它们。

import androidx.lifecycle.LiveData
import androidx.lifecycle.liveData
import androidx.lifecycle.map
import nihk.core.Resource

// Adapted from: https://developer.android.com/topic/libraries/architecture/coroutines
abstract class NetworkBoundResource<T> {

    fun asLiveData() = liveData<Resource<T>> {
        emit(Resource.Loading(null))

        if (shouldFetch(query())) {
            val disposable = emitSource(queryObservable().map { Resource.Loading(it) })

            try {
                val fetchedData = fetch()
                // Stop the previous emission to avoid dispatching the saveCallResult as `Resource.Loading`.
                disposable.dispose()
                saveFetchResult(fetchedData)
                // Re-establish the emission as `Resource.Success`.
                emitSource(queryObservable().map { Resource.Success(it) })
            } catch (e: Exception) {
                onFetchFailed(e)
                emitSource(queryObservable().map { Resource.Error(e, it) })
            }
        } else {
            emitSource(queryObservable().map { Resource.Success(it) })
        }
    }

    abstract suspend fun query(): T
    abstract fun queryObservable(): LiveData<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(exception: Exception) = Unit
    open fun shouldFetch(data: T) = true
}

然而,就像@CommonsWare 在评论中所说的那样,只公开一个 Flow<T> 会更好。这是我尝试过的方法。请注意,我没有在生产中使用过此代码,所以买家要当心。

import kotlinx.coroutines.flow.*
import nihk.core.Resource

abstract class NetworkBoundResource<T> {

    fun asFlow(): Flow<Resource<T>> = flow {
        val flow = query()
            .onStart { emit(Resource.Loading<T>(null)) }
            .flatMapConcat { data ->
                if (shouldFetch(data)) {
                    emit(Resource.Loading(data))

                    try {
                        saveFetchResult(fetch())
                        query().map { Resource.Success(it) }
                    } catch (throwable: Throwable) {
                        onFetchFailed(throwable)
                        query().map { Resource.Error(throwable, it) }
                    }
                } else {
                    query().map { Resource.Success(it) }
                }
            }

        emitAll(flow)
    }

    abstract fun query(): Flow<T>
    abstract suspend fun fetch(): T
    abstract suspend fun saveFetchResult(data: T)
    open fun onFetchFailed(throwable: Throwable) = Unit
    open fun shouldFetch(data: T) = true
}

@N1hk 回答正确,这只是不使用 flatMapConcat 运算符的不同实现(此时标记为 FlowPreview

@FlowPreview
@ExperimentalCoroutinesApi
abstract class NetworkBoundResource<ResultType, RequestType> {

    fun asFlow() = flow {
        emit(Resource.loading(null))

        val dbValue = loadFromDb().first()
        if (shouldFetch(dbValue)) {
            emit(Resource.loading(dbValue))
            when (val apiResponse = fetchFromNetwork()) {
                is ApiSuccessResponse -> {
                    saveNetworkResult(processResponse(apiResponse))
                    emitAll(loadFromDb().map { Resource.success(it) })
                }
                is ApiErrorResponse -> {
                    onFetchFailed()
                    emitAll(loadFromDb().map { Resource.error(apiResponse.errorMessage, it) })
                }
            }
        } else {
            emitAll(loadFromDb().map { Resource.success(it) })
        }
    }

    protected open fun onFetchFailed() {
        // Implement in sub-classes to handle errors
    }

    @WorkerThread
    protected open fun processResponse(response: ApiSuccessResponse<RequestType>) = response.body

    @WorkerThread
    protected abstract suspend fun saveNetworkResult(item: RequestType)

    @MainThread
    protected abstract fun shouldFetch(data: ResultType?): Boolean

    @MainThread
    protected abstract fun loadFromDb(): Flow<ResultType>

    @MainThread
    protected abstract suspend fun fetchFromNetwork(): ApiResponse<RequestType>
}

我是 Kotlin 协程的新手。我这周刚遇到这个问题。

我认为如果你使用上面 post 中提到的存储库模式,我的意见是随意将 CoroutineScope 传递到 NetworkBoundResourceCoroutineScope可以是Repository中函数的参数之一,其中returns一个LiveData,如:

suspend fun getData(scope: CoroutineScope): LiveDate<T>

在您的 ViewModel 中调用 getData() 时,将内置范围 viewmodelscope 作为 CoroutineScope 传递,因此 NetworkBoundResource 将在 viewmodelscope 内工作,并与 Viewmodel 的生命周期绑定。 NetworkBoundResource 中的协程会在 ViewModel 死掉时被取消,这将是一个好处。

要使用内置范围 viewmodelscope,请不要忘记在您的 build.gradle.

中添加以下内容
implementation 'androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0-alpha01'