如何在 Go 中递归列出带有频道的文件?
How to recursively list files with a channel in Go?
我正在尝试使用通道以递归方式列出目录树。
目前我得到了几个文件的列表,然后它卡在了一个目录中。目录已发送给工作人员,但它不处理它。
如何将目录发送到工作人员 (内部 (if file.IsDir()
),以便它得到正确处理,并通知文件列表器没有新文件要发送递归完成后处理?
这是我目前的尝试:
package main
import (
"fmt"
"os"
"path/filepath"
"errors"
"log"
)
// Job for worker
type workerJob struct {
Root string
}
// Result of a worker
type workerResult struct {
Filename string
}
func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
for j := range jobs {
log.Printf(`Directory: %#v`, j.Root)
dir, err := os.Open(j.Root)
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
fInfo, err := dir.Readdir(-1)
dir.Close()
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
for _, file := range fInfo {
fpath := filepath.Join(dir.Name(), file.Name())
if file.Mode().IsRegular() {
// is file
fs := uint64(file.Size())
if fs == 0 {
// Skip zero sized
continue
}
r := workerResult{
Filename: fpath,
}
log.Printf(`sent result: %#v`, r.Filename)
results <- r
} else if file.IsDir() {
// Send directory to be processed by the worker
nj := workerJob{
Root: fpath,
}
log.Printf(`sent new dir job: %#v`, nj.Root)
jobs <- nj
}
}
done <- true
}
}
func main() {
dir := `/tmp`
workerCount := 1
jobs := make(chan workerJob, workerCount)
results := make(chan workerResult)
readDone := make(chan bool)
// start N workers
for i := 0; i < workerCount; i++ {
go worker(jobs, results, readDone)
}
jobs <- workerJob{
Root: dir,
}
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-readDone:
log.Printf(`got stop`)
break readloop
}
}
}
这导致:
2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
main.main()
+0x281
goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
+0x4e7
created by main.main
+0x109
Process finished with exit code 2
如何解决死锁?
关于改进代码的初步意见
Tim 的评论似乎没有触及要点。您在 main()
末尾关闭通道并不重要,您的 select
语句有一个 default
也不重要。如果频道上有消息,频道阅读案例将 运行。
这可能被认为是一个问题,虽然没有消息,但您将通过 default
案例重复旋转循环,这将导致 CPU 使用量激增("busy-waiting"), 所以是的,可能只是删除默认大小写。
您还可以使用标签为打破 for
循环的 "stop" 频道添加一个案例(这是必需的,否则 break
只会从 select 语句,我们再次循环):
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-stopChan:
break readloop
}
最后,您可能还应该将 worker()
中的变量 f
重命名为 dir
,因为它是目录而不是文件。只是让代码更容易阅读。对于精通该语言的程序员来说,代码应该像自然语言一样阅读。那样的话,这个语句,
fpath := filepath.Join(f.Name(), file.Name())
变成
fpath := filepath.Join(dir.Name(), file.Name())
...这让您的 eyes/brain 更容易扫描。
为什么你的代码坏了
您的频道出现死锁。你没有注意到,因为 default
的情况意味着从技术上讲,一个 goroutine 总是可以做出 "progress"。否则 运行time 会引发恐慌说:
fatal error: all goroutines are asleep - deadlock!
这是因为worker()
具有以下结构:
receive from channel
...
...
foreach dir in root:
send to channel
...
...
但是在正常的通道上,发送和接收都是阻塞操作。 sends/receives 的 goroutine 在其合作伙伴出现之前不会取得进展。
您可以使用缓冲通道来避免这种情况,但无法提前知道目录中有多少个目录,因此缓冲区可能太小。我建议生成一个 goroutine,这样它就可以阻塞而不影响整个 worker()
循环:
go func() {
for _, file := range fInfo {
...
}
}()
您已经注意到 jobs <- nj
永远挂起。这是因为在 range
循环中,该操作会一直阻塞,直到一个 worker 接收到为止,并且只要它阻塞在那里,它就无法到达 range
循环。
为了解决这个问题,你生成了一个新的 goroutine 来解决这个问题。
go func() {
jobs <- nj
}()
还有一个问题:您的 readDone
频道。
目前,每次您的 worker
完成一项工作时都会发出该频道,这导致 select
在 [=19] 中的可能性(select
随机选择就绪频道) =] 拿起它然后关闭系统,这使得所有剩余的工作和结果都丢失了。
要解决这部分问题,你应该使用sync.WaitGroup
。每次添加新工作时,您都会调用 wg.Add(1)
,而每当您的工作人员完成工作时,您都会调用 wg.Done()
。在 func main()
中,您应生成一个 goroutine,它使用 wg.Wait()
等待所有作业完成,然后使用 readDone
.
关闭系统
// One initial job
wg.Add(1)
go func() {
jobs <- workerJob{
Root: dir,
}
}()
// When all jobs finished, shutdown the system.
go func() {
wg.Wait()
readDone <- true
}()
我正在尝试使用通道以递归方式列出目录树。
目前我得到了几个文件的列表,然后它卡在了一个目录中。目录已发送给工作人员,但它不处理它。
如何将目录发送到工作人员 (内部 (if file.IsDir()
),以便它得到正确处理,并通知文件列表器没有新文件要发送递归完成后处理?
这是我目前的尝试:
package main
import (
"fmt"
"os"
"path/filepath"
"errors"
"log"
)
// Job for worker
type workerJob struct {
Root string
}
// Result of a worker
type workerResult struct {
Filename string
}
func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
for j := range jobs {
log.Printf(`Directory: %#v`, j.Root)
dir, err := os.Open(j.Root)
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
fInfo, err := dir.Readdir(-1)
dir.Close()
if err != nil {
if os.IsPermission(err) {
// Skip if there's no permission
continue
}
continue
}
for _, file := range fInfo {
fpath := filepath.Join(dir.Name(), file.Name())
if file.Mode().IsRegular() {
// is file
fs := uint64(file.Size())
if fs == 0 {
// Skip zero sized
continue
}
r := workerResult{
Filename: fpath,
}
log.Printf(`sent result: %#v`, r.Filename)
results <- r
} else if file.IsDir() {
// Send directory to be processed by the worker
nj := workerJob{
Root: fpath,
}
log.Printf(`sent new dir job: %#v`, nj.Root)
jobs <- nj
}
}
done <- true
}
}
func main() {
dir := `/tmp`
workerCount := 1
jobs := make(chan workerJob, workerCount)
results := make(chan workerResult)
readDone := make(chan bool)
// start N workers
for i := 0; i < workerCount; i++ {
go worker(jobs, results, readDone)
}
jobs <- workerJob{
Root: dir,
}
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-readDone:
log.Printf(`got stop`)
break readloop
}
}
}
这导致:
2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
main.main()
+0x281
goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
+0x4e7
created by main.main
+0x109
Process finished with exit code 2
如何解决死锁?
关于改进代码的初步意见
Tim 的评论似乎没有触及要点。您在 main()
末尾关闭通道并不重要,您的 select
语句有一个 default
也不重要。如果频道上有消息,频道阅读案例将 运行。
这可能被认为是一个问题,虽然没有消息,但您将通过 default
案例重复旋转循环,这将导致 CPU 使用量激增("busy-waiting"), 所以是的,可能只是删除默认大小写。
您还可以使用标签为打破 for
循环的 "stop" 频道添加一个案例(这是必需的,否则 break
只会从 select 语句,我们再次循环):
readloop:
for {
select {
case res := <-results:
log.Printf(`result=%#v`, res.Filename)
case _ = <-stopChan:
break readloop
}
最后,您可能还应该将 worker()
中的变量 f
重命名为 dir
,因为它是目录而不是文件。只是让代码更容易阅读。对于精通该语言的程序员来说,代码应该像自然语言一样阅读。那样的话,这个语句,
fpath := filepath.Join(f.Name(), file.Name())
变成
fpath := filepath.Join(dir.Name(), file.Name())
...这让您的 eyes/brain 更容易扫描。
为什么你的代码坏了
您的频道出现死锁。你没有注意到,因为 default
的情况意味着从技术上讲,一个 goroutine 总是可以做出 "progress"。否则 运行time 会引发恐慌说:
fatal error: all goroutines are asleep - deadlock!
这是因为worker()
具有以下结构:
receive from channel
...
...
foreach dir in root:
send to channel
...
...
但是在正常的通道上,发送和接收都是阻塞操作。 sends/receives 的 goroutine 在其合作伙伴出现之前不会取得进展。
您可以使用缓冲通道来避免这种情况,但无法提前知道目录中有多少个目录,因此缓冲区可能太小。我建议生成一个 goroutine,这样它就可以阻塞而不影响整个 worker()
循环:
go func() {
for _, file := range fInfo {
...
}
}()
您已经注意到 jobs <- nj
永远挂起。这是因为在 range
循环中,该操作会一直阻塞,直到一个 worker 接收到为止,并且只要它阻塞在那里,它就无法到达 range
循环。
为了解决这个问题,你生成了一个新的 goroutine 来解决这个问题。
go func() {
jobs <- nj
}()
还有一个问题:您的 readDone
频道。
目前,每次您的 worker
完成一项工作时都会发出该频道,这导致 select
在 [=19] 中的可能性(select
随机选择就绪频道) =] 拿起它然后关闭系统,这使得所有剩余的工作和结果都丢失了。
要解决这部分问题,你应该使用sync.WaitGroup
。每次添加新工作时,您都会调用 wg.Add(1)
,而每当您的工作人员完成工作时,您都会调用 wg.Done()
。在 func main()
中,您应生成一个 goroutine,它使用 wg.Wait()
等待所有作业完成,然后使用 readDone
.
// One initial job
wg.Add(1)
go func() {
jobs <- workerJob{
Root: dir,
}
}()
// When all jobs finished, shutdown the system.
go func() {
wg.Wait()
readDone <- true
}()