等待所有例程完成

waiting for all go routines to finish

第一次使用 go,并尝试获取 go 例程和 WaitGroups 工作。

我有一个包含 100 行数据的 CSV 文件。 (101 包括 header)

我有以下简单代码:

package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "sync"
    "time"
)

func main() {
    start := time.Now()
    numRows := 0

    waitGroup := sync.WaitGroup{}
    file, _ := os.Open("./data.csv")

    scanner := bufio.NewScanner(file)
    scanner.Scan() // to read the header

    for scanner.Scan() {
        err := scanner.Err()

        if err != nil && err != io.EOF {
            panic(err)
        }

        waitGroup.Add(1)

        go (func() {
            numRows++
            waitGroup.Done()
        })()
    }

    waitGroup.Wait()
    file.Close()

    fmt.Println("Finished parsing ", numRows)
    fmt.Println("Elapsed time in seconds: ", time.Now().Sub(start))
}

当我运行这个时,numRows输出每次都在94到100之间波动。我希望它每次都是 100。如果我 运行 在 10 行数据的 CSV 上使用相同的代码,它每次都会输出 10

在我看来,最后几个围棋例程没有及时完成。

我尝试了以下失败的方法:

我错过了什么?

在不同的协程中同时修改单个变量是不安全的。您对 numRows 的一些更新将会丢失,并且您的程序有时可能会崩溃。

要么用互斥锁保护你的 numRows 变量,要么使用 atomic 函数之一以原子方式进行加法:

var numRows int32

// ...

    go (func() {
        atomic.AddInt32(&numRows, 1)
        waitGroup.Done()
    })()

你用这段代码做什么:

for scanner.Scan() {
    err := scanner.Err()

    if err != nil && err != io.EOF {
        panic(err)
    }

    waitGroup.Add(1)

    go (func() {
        numRows++
        waitGroup.Done()
    })()
}

实际上所有工作都在一个主 goroutine 中完成,只有 numRows increment 使用单独的 goroutines。我认为这可以简化为简单的增量:

for scanner.Scan() {
    err := scanner.Err()

    if err != nil && err != io.EOF {
        panic(err)
    }
    numRows++
}

如果你想模拟并行解析和流水线,你可以使用通道。只让一个 goroutine 负责计数器增量。每次当另一个 goroutine 想要增加计数器时 - 它会向该通道发送一条消息。

https://play.golang.org/p/W60twJjY8P