程序使用等待组进入死锁
Program goes into deadlock using waitgroup
我正在编写一个程序,该程序读取名为 orders.csv 的文件中的订单号列表,并将其与文件夹中存在的其他 csv 文件进行比较。
问题是即使使用等待组也会陷入死锁,我不知道为什么。
出于某种原因,Whosebug 说我的 post 主要是代码,所以我必须添加这一行,因为如果有人想帮助我调试我遇到的这个问题,那么整个代码是必需的。
package main
import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
type Files struct {
filenames []string
}
type Orders struct {
ID []string
}
var ordersFilename string = "orders.csv"
func main() {
var (
ordersFile *os.File
files Files
orders Orders
err error
)
mu := new(sync.Mutex)
wg := &sync.WaitGroup{}
wg.Add(1)
if ordersFile, err = os.Open(ordersFilename); err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders = getOrderIDs(ordersFile)
files.filenames = getCSVsFromCurrentDir()
var filenamesSize = len(files.filenames)
var ch = make(chan map[string][]string, filenamesSize)
var done = make(chan bool)
for i, filename := range files.filenames {
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
wg.Add(1)
defer wg.Done()
checkFile(currentFilename, orders, ch)
mu.Lock()
*filenamesSize--
mu.Unlock()
if i == *filenamesSize {
done <- true
close(done)
}
}(filename, ch, i, orders, wg, &filenamesSize, mu, done)
}
select {
case str := <-ch:
fmt.Printf("%+v\n", str)
case <-done:
wg.Done()
break
}
wg.Wait()
close(ch)
}
// getCSVsFromCurrentDir returns a string slice
// with the filenames of csv files inside the
// current directory that are not "orders.csv"
func getCSVsFromCurrentDir() []string {
var filenames []string
err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
if path != "." && strings.HasSuffix(path, ".csv") && path != ordersFilename {
filenames = append(filenames, path)
}
return nil
})
if err != nil {
log.Fatalln("Could not read file names in current dir")
}
return filenames
}
// getOrderIDs returns an Orders struct filled
// with order IDs retrieved from the file
func getOrderIDs(file *os.File) Orders {
var (
orders Orders
err error
fileContent string
)
reader := bufio.NewReader(file)
if fileContent, err = readLine(reader); err != nil {
log.Fatalln("Could not read file: " + ordersFilename)
}
for err == nil {
orders.ID = append(orders.ID, fileContent)
fileContent, err = readLine(reader)
}
return orders
}
func checkFile(filename string, orders Orders, ch chan<- map[string][]string) {
var (
err error
file *os.File
fileContent string
orderFilesMap map[string][]string
counter int
)
orderFilesMap = make(map[string][]string)
if file, err = os.Open(filename); err != nil {
log.Fatalln("Could not read file: " + filename)
}
reader := bufio.NewReader(file)
if fileContent, err = readLine(reader); err != nil {
log.Fatalln("Could not read file: " + filename)
}
for err == nil {
if containedInSlice(fileContent, orders.ID) && !containedInSlice(fileContent, orderFilesMap[filename]) {
orderFilesMap[filename] = append(orderFilesMap[filename], fileContent)
// fmt.Println("Found: ", fileContent, " in ", filename)
} else {
// fmt.Printf("Could not find: '%s' in '%s'\n", fileContent, filename)
}
counter++
fileContent, err = readLine(reader)
}
ch <- orderFilesMap
}
// containedInSlice returns true or false
// based on whether the string is contained
// in the slice
func containedInSlice(str string, slice []string) bool {
for _, ID := range slice {
if ID == str {
return true
}
}
return false
}
// readLine returns a line from the passed reader
func readLine(r *bufio.Reader) (string, error) {
var (
isPrefix bool = true
err error = nil
line, ln []byte
)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
}
return string(ln), err
}
不是答案,而是一些不适合评论框的评论。
在这部分代码中
func main() {
var (
ordersFile *os.File
files Files
orders Orders
err error
)
mu := new(sync.Mutex)
wg := &sync.WaitGroup{}
wg.Add(1)
最后一条语句是对 wg.Add 的调用,看起来悬空。我的意思是我们很难理解什么会触发所需的 wg.Done 对应部分。在没有 wg.Done 的情况下调用 wg.Add 是错误的,如果不以这种方式编写它们很容易出错,我们无法立即成对找到它们。
那部分代码,明显是错误的
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
wg.Add(1)
defer wg.Done()
考虑到例程执行时,您将 1 添加到等待组,父例程继续执行。请参阅此示例:https://play.golang.org/p/N9Chaqkv4bd
主例程不等待等待组,因为它没有时间递增。
还有很多话要说,但我发现很难理解你的代码的用途,所以我不确定如何在不基本上重写它的情况下进一步帮助你。
第一个问题是 wg.Add
总是必须在它代表的 goroutine 之外。如果不是,则
wg.Wait
调用可能会在 goutine(s) 实际开始之前调用 运行(并调用 wg.Add
),因此会“思考”
没有什么可等待的。
代码的第二个问题是它有多种等待例程完成的方法。有
WaitGroup
和 done
频道。只使用其中一个。哪个也取决于结果如何
使用 goroutines。下面我们来到下一个问题。
第三个问题是收集结果。目前代码仅打印/使用来自 goroutines 的单个结果。
在 select 周围放置一个 for { ... }
循环,如果 done
通道关闭,则使用 return
跳出循环。
(请注意,您不需要在 done
频道上发送任何内容,关闭它就足够了。)
改进版本 0.0.1
所以这里是第一个版本(包括一些其他的“代码清理”),其中 done
通道用于关闭并删除了 WaitGroup
:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
mu = new(sync.Mutex)
filenamesSize = len(files.filenames)
ch = make(chan map[string][]string, filenamesSize)
done = make(chan bool)
)
for i, filename := range files.filenames {
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
checkFile(currentFilename, orders, ch)
mu.Lock()
*filenamesSize--
mu.Unlock()
// TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:
if i == *filenamesSize {
done <- true
close(done)
}
}(filename, ch, i, orders, &filenamesSize, mu, done)
}
// Note: closing a channel is not really needed, so you can omit this:
defer close(ch)
for {
select {
case str := <-ch:
fmt.Printf("%+v\n", str)
case <-done:
return
}
}
}
改进版本 0.0.2
- 但是,对于您的情况,我们有一些优势。我们确切地知道我们启动了多少 goroutines,因此也知道如何
我们期望的许多结果。 (当然,如果每个 goroutine returns 当前这段代码的结果。)这给出了
我们还有另一个选择,因为我们可以使用另一个具有相同迭代次数的 for 循环来收集结果:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
// Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will
// be passed through the channel. A fixed, small size is perfect here.
ch = make(chan map[string][]string, 5)
)
for _, filename := range files.filenames {
go func(filename string) {
// orders and channel are not variables of the loop and can be used without copying
checkFile(filename, orders, ch)
}(filename)
}
for range files.filenames {
str := <-ch
fmt.Printf("%+v\n", str)
}
}
简单多了,不是吗?希望对您有所帮助!
这段代码有很多错误。
- 您使用的 WaitGroup 有误。必须在主 goroutine 中调用 Add,否则有可能在所有 Add 调用完成之前调用 Wait。
- 在初始化与 Done() 调用不匹配的 WaitGroup 之后有一个无关的 Add(1) 调用,因此 Wait 永远不会 return(假设上面的点是固定的)。
- 您同时使用 WaitGroup 和 done 通道来表示完成。这充其量是多余的。
- 您正在读取 filenamesSize 而未持有锁(在
if i == *filenamesSize
语句中)。这是竞争条件。
i == *filenamesSize
条件从一开始就没有意义。 Goroutines 以任意顺序执行,所以你不能确定 i == 0 的 goroutine 是最后一个递减 filenamesSize
这一切都可以通过摆脱大多数 if 同步原语并在所有 goroutine 完成后简单地关闭 ch 通道来简化:
func main() {
ch := make(chan map[string][]string)
var wg WaitGroup
for _, filename := range getCSVsFromCurrentDir() {
filename := filename // capture loop var
wg.Add(1)
go func() {
checkFile(filename, orders, ch)
wg.Done()
}()
}
go func() {
wg.Wait() // after all goroutines are done...
close(ch) // let range loop below exit
}()
for str := range ch {
// ...
}
}
我正在编写一个程序,该程序读取名为 orders.csv 的文件中的订单号列表,并将其与文件夹中存在的其他 csv 文件进行比较。
问题是即使使用等待组也会陷入死锁,我不知道为什么。
出于某种原因,Whosebug 说我的 post 主要是代码,所以我必须添加这一行,因为如果有人想帮助我调试我遇到的这个问题,那么整个代码是必需的。
package main
import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
type Files struct {
filenames []string
}
type Orders struct {
ID []string
}
var ordersFilename string = "orders.csv"
func main() {
var (
ordersFile *os.File
files Files
orders Orders
err error
)
mu := new(sync.Mutex)
wg := &sync.WaitGroup{}
wg.Add(1)
if ordersFile, err = os.Open(ordersFilename); err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders = getOrderIDs(ordersFile)
files.filenames = getCSVsFromCurrentDir()
var filenamesSize = len(files.filenames)
var ch = make(chan map[string][]string, filenamesSize)
var done = make(chan bool)
for i, filename := range files.filenames {
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
wg.Add(1)
defer wg.Done()
checkFile(currentFilename, orders, ch)
mu.Lock()
*filenamesSize--
mu.Unlock()
if i == *filenamesSize {
done <- true
close(done)
}
}(filename, ch, i, orders, wg, &filenamesSize, mu, done)
}
select {
case str := <-ch:
fmt.Printf("%+v\n", str)
case <-done:
wg.Done()
break
}
wg.Wait()
close(ch)
}
// getCSVsFromCurrentDir returns a string slice
// with the filenames of csv files inside the
// current directory that are not "orders.csv"
func getCSVsFromCurrentDir() []string {
var filenames []string
err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
if path != "." && strings.HasSuffix(path, ".csv") && path != ordersFilename {
filenames = append(filenames, path)
}
return nil
})
if err != nil {
log.Fatalln("Could not read file names in current dir")
}
return filenames
}
// getOrderIDs returns an Orders struct filled
// with order IDs retrieved from the file
func getOrderIDs(file *os.File) Orders {
var (
orders Orders
err error
fileContent string
)
reader := bufio.NewReader(file)
if fileContent, err = readLine(reader); err != nil {
log.Fatalln("Could not read file: " + ordersFilename)
}
for err == nil {
orders.ID = append(orders.ID, fileContent)
fileContent, err = readLine(reader)
}
return orders
}
func checkFile(filename string, orders Orders, ch chan<- map[string][]string) {
var (
err error
file *os.File
fileContent string
orderFilesMap map[string][]string
counter int
)
orderFilesMap = make(map[string][]string)
if file, err = os.Open(filename); err != nil {
log.Fatalln("Could not read file: " + filename)
}
reader := bufio.NewReader(file)
if fileContent, err = readLine(reader); err != nil {
log.Fatalln("Could not read file: " + filename)
}
for err == nil {
if containedInSlice(fileContent, orders.ID) && !containedInSlice(fileContent, orderFilesMap[filename]) {
orderFilesMap[filename] = append(orderFilesMap[filename], fileContent)
// fmt.Println("Found: ", fileContent, " in ", filename)
} else {
// fmt.Printf("Could not find: '%s' in '%s'\n", fileContent, filename)
}
counter++
fileContent, err = readLine(reader)
}
ch <- orderFilesMap
}
// containedInSlice returns true or false
// based on whether the string is contained
// in the slice
func containedInSlice(str string, slice []string) bool {
for _, ID := range slice {
if ID == str {
return true
}
}
return false
}
// readLine returns a line from the passed reader
func readLine(r *bufio.Reader) (string, error) {
var (
isPrefix bool = true
err error = nil
line, ln []byte
)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
}
return string(ln), err
}
不是答案,而是一些不适合评论框的评论。
在这部分代码中
func main() {
var (
ordersFile *os.File
files Files
orders Orders
err error
)
mu := new(sync.Mutex)
wg := &sync.WaitGroup{}
wg.Add(1)
最后一条语句是对 wg.Add 的调用,看起来悬空。我的意思是我们很难理解什么会触发所需的 wg.Done 对应部分。在没有 wg.Done 的情况下调用 wg.Add 是错误的,如果不以这种方式编写它们很容易出错,我们无法立即成对找到它们。
那部分代码,明显是错误的
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
wg.Add(1)
defer wg.Done()
考虑到例程执行时,您将 1 添加到等待组,父例程继续执行。请参阅此示例:https://play.golang.org/p/N9Chaqkv4bd 主例程不等待等待组,因为它没有时间递增。
还有很多话要说,但我发现很难理解你的代码的用途,所以我不确定如何在不基本上重写它的情况下进一步帮助你。
第一个问题是
wg.Add
总是必须在它代表的 goroutine 之外。如果不是,则wg.Wait
调用可能会在 goutine(s) 实际开始之前调用 运行(并调用wg.Add
),因此会“思考” 没有什么可等待的。代码的第二个问题是它有多种等待例程完成的方法。有
WaitGroup
和done
频道。只使用其中一个。哪个也取决于结果如何 使用 goroutines。下面我们来到下一个问题。第三个问题是收集结果。目前代码仅打印/使用来自 goroutines 的单个结果。 在 select 周围放置一个
for { ... }
循环,如果done
通道关闭,则使用return
跳出循环。 (请注意,您不需要在done
频道上发送任何内容,关闭它就足够了。)
改进版本 0.0.1
所以这里是第一个版本(包括一些其他的“代码清理”),其中 done
通道用于关闭并删除了 WaitGroup
:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
mu = new(sync.Mutex)
filenamesSize = len(files.filenames)
ch = make(chan map[string][]string, filenamesSize)
done = make(chan bool)
)
for i, filename := range files.filenames {
go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
checkFile(currentFilename, orders, ch)
mu.Lock()
*filenamesSize--
mu.Unlock()
// TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:
if i == *filenamesSize {
done <- true
close(done)
}
}(filename, ch, i, orders, &filenamesSize, mu, done)
}
// Note: closing a channel is not really needed, so you can omit this:
defer close(ch)
for {
select {
case str := <-ch:
fmt.Printf("%+v\n", str)
case <-done:
return
}
}
}
改进版本 0.0.2
- 但是,对于您的情况,我们有一些优势。我们确切地知道我们启动了多少 goroutines,因此也知道如何 我们期望的许多结果。 (当然,如果每个 goroutine returns 当前这段代码的结果。)这给出了 我们还有另一个选择,因为我们可以使用另一个具有相同迭代次数的 for 循环来收集结果:
func main() {
ordersFile, err := os.Open(ordersFilename)
if err != nil {
log.Fatalln("Could not open file: " + ordersFilename)
}
orders := getOrderIDs(ordersFile)
files := Files{
filenames: getCSVsFromCurrentDir(),
}
var (
// Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will
// be passed through the channel. A fixed, small size is perfect here.
ch = make(chan map[string][]string, 5)
)
for _, filename := range files.filenames {
go func(filename string) {
// orders and channel are not variables of the loop and can be used without copying
checkFile(filename, orders, ch)
}(filename)
}
for range files.filenames {
str := <-ch
fmt.Printf("%+v\n", str)
}
}
简单多了,不是吗?希望对您有所帮助!
这段代码有很多错误。
- 您使用的 WaitGroup 有误。必须在主 goroutine 中调用 Add,否则有可能在所有 Add 调用完成之前调用 Wait。
- 在初始化与 Done() 调用不匹配的 WaitGroup 之后有一个无关的 Add(1) 调用,因此 Wait 永远不会 return(假设上面的点是固定的)。
- 您同时使用 WaitGroup 和 done 通道来表示完成。这充其量是多余的。
- 您正在读取 filenamesSize 而未持有锁(在
if i == *filenamesSize
语句中)。这是竞争条件。 i == *filenamesSize
条件从一开始就没有意义。 Goroutines 以任意顺序执行,所以你不能确定 i == 0 的 goroutine 是最后一个递减 filenamesSize
这一切都可以通过摆脱大多数 if 同步原语并在所有 goroutine 完成后简单地关闭 ch 通道来简化:
func main() {
ch := make(chan map[string][]string)
var wg WaitGroup
for _, filename := range getCSVsFromCurrentDir() {
filename := filename // capture loop var
wg.Add(1)
go func() {
checkFile(filename, orders, ch)
wg.Done()
}()
}
go func() {
wg.Wait() // after all goroutines are done...
close(ch) // let range loop below exit
}()
for str := range ch {
// ...
}
}