如何在 Go 中递归列出带有频道的文件?

How to recursively list files with a channel in Go?

我正在尝试使用通道以递归方式列出目录树。

目前我得到了几个文件的列表,然后它卡在了一个目录中。目录已发送给工作人员,但它不处理它。

如何将目录发送到工作人员 (内部 (if file.IsDir()),以便它得到正确处理,并通知文件列表器没有新文件要发送递归完成后处理?

这是我目前的尝试:

package main

import (
    "fmt"
    "os"
    "path/filepath"
    "errors"
    "log"
)

// Job for worker
type workerJob struct {
    Root string
}

// Result of a worker
type workerResult struct {
    Filename string
}

func worker(jobs chan workerJob, results chan<- workerResult, done chan bool) {
    for j := range jobs {
        log.Printf(`Directory: %#v`, j.Root)

        dir, err := os.Open(j.Root)

        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        fInfo, err := dir.Readdir(-1)
        dir.Close()
        if err != nil {
            if os.IsPermission(err) {
                // Skip if there's no permission
                continue
            }
            continue
        }

        for _, file := range fInfo {
            fpath := filepath.Join(dir.Name(), file.Name())

            if file.Mode().IsRegular() {
                // is file
                fs := uint64(file.Size())
                if fs == 0 {
                    // Skip zero sized
                    continue
                }

                r := workerResult{
                    Filename: fpath,
                }

                log.Printf(`sent result: %#v`, r.Filename)
                results <- r
            } else if file.IsDir() {
                // Send directory to be processed by the worker
                nj := workerJob{
                    Root: fpath,
                }
                log.Printf(`sent new dir job: %#v`, nj.Root)
                jobs <- nj
            }
        }

        done <- true
    }
}

func main() {
    dir := `/tmp`

    workerCount := 1

    jobs := make(chan workerJob, workerCount)
    results := make(chan workerResult)
    readDone := make(chan bool)

    // start N workers
    for i := 0; i < workerCount; i++ {
        go worker(jobs, results, readDone)
    }

    jobs <- workerJob{
        Root: dir,
    }

    readloop:
    for {
        select {
        case res := <-results:
            log.Printf(`result=%#v`, res.Filename)
        case _ = <-readDone:
            log.Printf(`got stop`)
            break readloop
        }
    }

}

这导致:

2018/07/12 14:37:29 Directory: "/tmp"
2018/07/12 14:37:29 sent result: "/tmp/.bashrc"
2018/07/12 14:37:29 result="/tmp/.bashrc"
2018/07/12 14:37:29 sent result: "/tmp/.bash_profile"
2018/07/12 14:37:29 result="/tmp/.bash_profile"
2018/07/12 14:37:29 sent result: "/tmp/.bash_logout"
2018/07/12 14:37:29 result="/tmp/.bash_logout"
2018/07/12 14:37:29 sent result: "/tmp/.xinitrc"
2018/07/12 14:37:29 result="/tmp/.xinitrc"
2018/07/12 14:37:29 sent new dir job: "/tmp/.config"
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
main.main()
    +0x281

goroutine 5 [chan send]:
main.worker(0xc42005a060, 0xc420078060, 0xc4200780c0)
    +0x4e7
created by main.main
    +0x109

Process finished with exit code 2

如何解决死锁?

关于改进代码的初步意见

Tim 的评论似乎没有触及要点。您在 main() 末尾关闭通道并不重要,您的 select 语句有一个 default 也不重要。如果频道上有消息,频道阅读案例将 运行。

这可能被认为是一个问题,虽然没有消息,但您将通过 default 案例重复旋转循环,这将导致 CPU 使用量激增("busy-waiting"), 所以是的,可能只是删除默认大小写。

您还可以使用标签为打破 for 循环的 "stop" 频道添加一个案例(这是必需的,否则 break 只会从 select 语句,我们再次循环):

readloop:
for {
    select {
    case res := <-results:
        log.Printf(`result=%#v`, res.Filename)
    case _ = <-stopChan:
        break readloop
}

最后,您可能还应该将 worker() 中的变量 f 重命名为 dir,因为它是目录而不是文件。只是让代码更容易阅读。对于精通该语言的程序员来说,代码应该像自然语言一样阅读。那样的话,这个语句,

fpath := filepath.Join(f.Name(), file.Name())

变成

fpath := filepath.Join(dir.Name(), file.Name())

...这让您的 eyes/brain 更容易扫描。

为什么你的代码坏了

您的频道出现死锁。你没有注意到,因为 default 的情况意味着从技术上讲,一个 goroutine 总是可以做出 "progress"。否则 运行time 会引发恐慌说:

fatal error: all goroutines are asleep - deadlock!

这是因为worker()具有以下结构:

receive from channel
...
    ...
    foreach dir in root:
        send to channel
    ...
...

但是在正常的通道上,发送和接收都是阻塞操作。 sends/receives 的 goroutine 在其合作伙伴出现之前不会取得进展。

您可以使用缓冲通道来避免这种情况,但无法提前知道目录中有多少个目录,因此缓冲区可能太小。我建议生成一个 goroutine,这样它就可以阻塞而不影响整个 worker() 循环:

go func() {
    for _, file := range fInfo {
        ...
    }
}()

您已经注意到 jobs <- nj 永远挂起。这是因为在 range 循环中,该操作会一直阻塞,直到一个 worker 接收到为止,并且只要它阻塞在那里,它就无法到达 range 循环。

为了解决这个问题,你生成了一个新的 goroutine 来解决这个问题。

go func() {
        jobs <- nj
}()

还有一个问题:您的 readDone 频道。

目前,每次您的 worker 完成一项工作时都会发出该频道,这导致 select 在 [=19] 中的可能性(select 随机选择就绪频道) =] 拿起它然后关闭系统,这使得所有剩余的工作和结果都丢失了。

要解决这部分问题,你应该使用sync.WaitGroup。每次添加新工作时,您都会调用 wg.Add(1),而每当您的工作人员完成工作时,您都会调用 wg.Done()。在 func main() 中,您应生成一个 goroutine,它使用 wg.Wait() 等待所有作业完成,然后使用 readDone.

关闭系统
// One initial job
wg.Add(1)
go func() {
    jobs <- workerJob{
        Root: dir,
    }
}()

// When all jobs finished, shutdown the system.
go func() {
    wg.Wait()
    readDone <- true
}()

完整代码:https://play.golang.org/p/KzVxtflu1eU