遍历文件夹时 Goroutine 死锁
Goroutine deadlock while walking folders
我有此代码基于 pipelines 示例。 walkFiles
接受一个或多个文件夹(在 folders
变量中指定)和 "visits" 所有文件夹中的文件作为参数给出。它还需要一个 done
通道来允许取消,但我认为这对这个问题无关紧要。
当只传递一个文件夹时,代码按预期工作。但是当给出两个时,它给了我臭名昭著的 fatal error: all goroutines are asleep - deadlock!
错误。甚至看起来它通过处理两个文件夹的文件在做正确的事情,但它并没有很好地结束。我在并发执行此函数时犯的(可能很明显的)错误是什么?
代码如下:
type result struct {
path string
checksum []byte
err error
}
type FileData struct {
Hash []byte
}
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func (p Processor) walkFiles(done <-chan struct{}, folders []string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
visit := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
}
var wg sync.WaitGroup
for i, folder := range folders {
wg.Add(1)
go func(f string, i int) {
defer wg.Done()
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(f, visit)
}(folder, i)
}
go func() {
wg.Wait()
close(paths)
}()
return paths, errc
}
func closeFile(f *os.File) {
err := f.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
// processor reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func (p Processor) process(done <-chan struct{}, files <-chan string, c chan<- result, loc *locator.Locator) {
for f := range files {
func() {
file, err := os.Open(f.path)
if err != nil {
fmt.Println(err)
return
}
defer closeFile(file)
// Hashing file, producing `checksum` variable, and an `err`
select {
case c <- result{f.path, checksum, err}:
case <-done:
return
}
}()
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func (p Processor) MD5All(folders []string) (map[string]FileData, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := p.walkFiles(done, folders)
c := make(chan result)
var wg sync.WaitGroup
wg.Add(NUM_DIGESTERS)
for i := 0; i < NUM_DIGESTERS; i++ {
go func() {
p.process(done, paths, c, loc)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
// End of pipeline. OMIT
m := make(map[string]FileData)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = FileData{r.checksum}
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func (p Processor) Start() map[string]FileData {
m, err := p.MD5All(p.folders)
if err != nil {
log.Fatal(err)
}
return m
}
问题出在这里:
if err := <-errc; err != nil {
return nil, err
}
您只从 errc
中读取一次,但所有的 groutine 都在写入它。一旦为第一个完成的 goroutine 读取了 errc,所有其他 goroutine 都在等待写入它。
使用 for 循环读取。
我有此代码基于 pipelines 示例。 walkFiles
接受一个或多个文件夹(在 folders
变量中指定)和 "visits" 所有文件夹中的文件作为参数给出。它还需要一个 done
通道来允许取消,但我认为这对这个问题无关紧要。
当只传递一个文件夹时,代码按预期工作。但是当给出两个时,它给了我臭名昭著的 fatal error: all goroutines are asleep - deadlock!
错误。甚至看起来它通过处理两个文件夹的文件在做正确的事情,但它并没有很好地结束。我在并发执行此函数时犯的(可能很明显的)错误是什么?
代码如下:
type result struct {
path string
checksum []byte
err error
}
type FileData struct {
Hash []byte
}
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func (p Processor) walkFiles(done <-chan struct{}, folders []string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
visit := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
}
var wg sync.WaitGroup
for i, folder := range folders {
wg.Add(1)
go func(f string, i int) {
defer wg.Done()
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(f, visit)
}(folder, i)
}
go func() {
wg.Wait()
close(paths)
}()
return paths, errc
}
func closeFile(f *os.File) {
err := f.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
// processor reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func (p Processor) process(done <-chan struct{}, files <-chan string, c chan<- result, loc *locator.Locator) {
for f := range files {
func() {
file, err := os.Open(f.path)
if err != nil {
fmt.Println(err)
return
}
defer closeFile(file)
// Hashing file, producing `checksum` variable, and an `err`
select {
case c <- result{f.path, checksum, err}:
case <-done:
return
}
}()
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func (p Processor) MD5All(folders []string) (map[string]FileData, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := p.walkFiles(done, folders)
c := make(chan result)
var wg sync.WaitGroup
wg.Add(NUM_DIGESTERS)
for i := 0; i < NUM_DIGESTERS; i++ {
go func() {
p.process(done, paths, c, loc)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
// End of pipeline. OMIT
m := make(map[string]FileData)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = FileData{r.checksum}
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func (p Processor) Start() map[string]FileData {
m, err := p.MD5All(p.folders)
if err != nil {
log.Fatal(err)
}
return m
}
问题出在这里:
if err := <-errc; err != nil {
return nil, err
}
您只从 errc
中读取一次,但所有的 groutine 都在写入它。一旦为第一个完成的 goroutine 读取了 errc,所有其他 goroutine 都在等待写入它。
使用 for 循环读取。