等待所有例程完成
waiting for all go routines to finish
第一次使用 go,并尝试获取 go 例程和 WaitGroups
工作。
我有一个包含 100 行数据的 CSV 文件。 (101 包括 header)
我有以下简单代码:
package main
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"time"
)
func main() {
start := time.Now()
numRows := 0
waitGroup := sync.WaitGroup{}
file, _ := os.Open("./data.csv")
scanner := bufio.NewScanner(file)
scanner.Scan() // to read the header
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
waitGroup.Add(1)
go (func() {
numRows++
waitGroup.Done()
})()
}
waitGroup.Wait()
file.Close()
fmt.Println("Finished parsing ", numRows)
fmt.Println("Elapsed time in seconds: ", time.Now().Sub(start))
}
当我运行这个时,numRows
输出每次都在94到100之间波动。我希望它每次都是 100。如果我 运行 在 10 行数据的 CSV 上使用相同的代码,它每次都会输出 10
。
在我看来,最后几个围棋例程没有及时完成。
我尝试了以下失败的方法:
- 使用
CsvReader
而不是 Scanner
- 移动
waitGroup.Add(1)
到匿名函数下面
- 将匿名函数移出到 package-level 作用域函数中(并使用 ptrs 进行传递)
我错过了什么?
在不同的协程中同时修改单个变量是不安全的。您对 numRows
的一些更新将会丢失,并且您的程序有时可能会崩溃。
要么用互斥锁保护你的 numRows
变量,要么使用 atomic 函数之一以原子方式进行加法:
var numRows int32
// ...
go (func() {
atomic.AddInt32(&numRows, 1)
waitGroup.Done()
})()
你用这段代码做什么:
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
waitGroup.Add(1)
go (func() {
numRows++
waitGroup.Done()
})()
}
实际上所有工作都在一个主 goroutine 中完成,只有 numRows
increment 使用单独的 goroutines。我认为这可以简化为简单的增量:
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
numRows++
}
如果你想模拟并行解析和流水线,你可以使用通道。只让一个 goroutine 负责计数器增量。每次当另一个 goroutine 想要增加计数器时 - 它会向该通道发送一条消息。
第一次使用 go,并尝试获取 go 例程和 WaitGroups
工作。
我有一个包含 100 行数据的 CSV 文件。 (101 包括 header)
我有以下简单代码:
package main
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"time"
)
func main() {
start := time.Now()
numRows := 0
waitGroup := sync.WaitGroup{}
file, _ := os.Open("./data.csv")
scanner := bufio.NewScanner(file)
scanner.Scan() // to read the header
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
waitGroup.Add(1)
go (func() {
numRows++
waitGroup.Done()
})()
}
waitGroup.Wait()
file.Close()
fmt.Println("Finished parsing ", numRows)
fmt.Println("Elapsed time in seconds: ", time.Now().Sub(start))
}
当我运行这个时,numRows
输出每次都在94到100之间波动。我希望它每次都是 100。如果我 运行 在 10 行数据的 CSV 上使用相同的代码,它每次都会输出 10
。
在我看来,最后几个围棋例程没有及时完成。
我尝试了以下失败的方法:
- 使用
CsvReader
而不是Scanner
- 移动
waitGroup.Add(1)
到匿名函数下面 - 将匿名函数移出到 package-level 作用域函数中(并使用 ptrs 进行传递)
我错过了什么?
在不同的协程中同时修改单个变量是不安全的。您对 numRows
的一些更新将会丢失,并且您的程序有时可能会崩溃。
要么用互斥锁保护你的 numRows
变量,要么使用 atomic 函数之一以原子方式进行加法:
var numRows int32
// ...
go (func() {
atomic.AddInt32(&numRows, 1)
waitGroup.Done()
})()
你用这段代码做什么:
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
waitGroup.Add(1)
go (func() {
numRows++
waitGroup.Done()
})()
}
实际上所有工作都在一个主 goroutine 中完成,只有 numRows
increment 使用单独的 goroutines。我认为这可以简化为简单的增量:
for scanner.Scan() {
err := scanner.Err()
if err != nil && err != io.EOF {
panic(err)
}
numRows++
}
如果你想模拟并行解析和流水线,你可以使用通道。只让一个 goroutine 负责计数器增量。每次当另一个 goroutine 想要增加计数器时 - 它会向该通道发送一条消息。