并发修改如何与协程一起工作?

How does concurrent modification work with coroutines?

我正在学习这个 coroutines 实践教程 Coroutines-Channels. So there is this task of concurrently fetching contributors and showing intermediate progress at the same time using channels, See Here

下面是建议解决方案的片段

suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit) = coroutineScope {

........
.........
val channel = Channel<List<User>>()
for (repo in repos) {
    launch {
        val users = service.getRepoContributors(req.org, repo.name) // suspend function
            .also { logUsers(repo, it) }
            .bodyList()
        channel.send(users) // suspend function
    }
}
var allUsers = emptyList<User>()
repeat(repos.size) {
    val users = channel.receive()  // suspend function
    allUsers = (allUsers + users).aggregate()
    updateResults(allUsers, it == repos.lastIndex) // suspend function
}
}

函数 loadContributorsChannels() 在使用 Default dispatcher.See herecoroutine 中调用。我有两个问题。

  1. 在上面的代码片段中,allUsers 被同时修改,因为我们已经在使用 Default dispatcher?

    coroutine
  2. 如果我像下面这样更改代码序列,为什么会得到不正确的结果?上面的代码与下面的代码片段有何不同?

    val contributorsChannel = Channel<List<User>>()
    var contributors = emptyList<User>()
    
    for(repo in repos) { 
        launch {
            val contributorsPerRepo = service
                .getRepoContributors(req.org, repo.name) // suspend function
                .also { logUsers(repo, it) }
                .bodyList()
    
            contributors = (contributors + contributorsPerRepo).aggregate()
            contributorsChannel.send(contributors)  // suspend function
        }
    }
    
    repeat(repos.size) {
        updateResults(contributorsChannel.receive(), it == repos.lastIndex) // suspend functions
    }
    

这是因为并发修改还是我遗漏了什么?

原代码中,顶层协程是唯一使用allUsers的协程。这是它的本地状态。

在您的代码中,contributors 是所有协程共享并同时更新的变量。

原始代码正确地将 Channel 应用为一种同步机制,将所有并发计算扇入到一个收集结果并使用它们的协程中。