协程线程安全与改造

Coroutine Thread Safety with Retrofit

我在将有关使用协程启动网络请求的线程安全性的所有信息放在一起时仍然有点麻烦。

假设我们有以下用例,有一个我们得到的用户列表,对于每个用户,我将做一些特定的检查,必须 运行 通过对 API,给我一些关于这个用户的信息。

userCheck 发生在一个库中,它不公开挂起函数,但仍然使用回调。 在这个库中,我看到了这样的代码来启动每个网络请求:

internal suspend fun <T> doNetworkRequest(request: suspend () -> Response<T>): NetworkResult<T> {
    return withContext(Dispatchers.IO) {
        try {
            val response = request.invoke()
            ...

根据文档,Dispatchers.IO 可以使用多线程来执行代码,请求函数也只是来自 Retrofit API.

的函数

所以我所做的是为每个用户发起请求,并使用单个 resultHandler 对象,它将结果添加到列表中并检查结果列表的长度是否等于用户列表的长度,如果是这样,那么所有 userChecks 都完成了,我知道我可以对结果做一些事情,这些结果需要一起返回。

val userList: List<String>? = getUsers()
val userCheckResultList = mutableListOf<UserCheckResult>()
val handler = object : UserCheckResultHandler {
                  override fun onResult(
                        userCheckResult: UserCheckResult?
                  ) {
                        userCheckResult?.let {
                            userCheckResultList.add(
                                it
                            )
                        }
                        if (userCheckResultList.size == userList?.size) {
                            doSomethingWithResultList()
                            print("SUCCESS")
                        }
                    }
                }

userList?.forEach {
    checkUser(it, handler)
}

我的问题是:这个实现是线程安全的吗?据我所知,Kotlin 对象应该是线程安全的,但我收到反馈说这可能不是最好的实现 :D

但理论上,即使请求异步启动并同时启动多个请求,一次也只有一个可以访问线程的锁,结果处理程序正在 运行ning 上并且会有没有竞争条件或将项目添加到列表和比较大小的问题。

我错了吗? 有什么办法可以更好地处理这种情况?

如果您并行执行多个请求 - 它不是。 List 不是线程安全的。但这很简单。创建一个 Mutex 对象,然后将您的操作包装在锁定的列表中,如下所示:

val lock = Mutex()
val userList: List<String>? = getUsers()
val userCheckResultList = mutableListOf<UserCheckResult>()
val handler = object : UserCheckResultHandler {
                  override fun onResult(
                        userCheckResult: UserCheckResult?
                  ) {
                        lock.withLock {
                            userCheckResult?.let {
                                userCheckResultList.add(
                                    it
                                )
                            }
                            if (userCheckResultList.size == userList?.size) {
                                doSomethingWithResultList()
                                print("SUCCESS")
                            }
                        }
                    }
                }

userList?.forEach {
    checkUser(it, handler)
}

我必须补充一点,整个解决方案看起来很老套。我会完全走另一条路。 运行 您所有的请求都包含在 async { // network request } 中,这将 return Deferred 反对。将此对象添加到某个列表中。之后使用 awaitAll() 等待所有这些延迟对象。像那样:

val jobs = mutableListOf<Job>()
userList?.forEach {
   // i assume checkUser is suspendable here
   jobs += async { checkUser(it, handler) }
}

// wait for all requests
jobs.awaitAll()

// After that you can access all results like this:
val resultOfJob0 = jobs[0].getCompleted()