golang sync.WaitGroup 永远不会完成

golang sync.WaitGroup never completes

我有以下代码获取 URL 的列表,然后有条件地下载文件并将其保存到文件系统。文件是同时获取的,主 goroutine 等待所有文件被获取。但是,程序 在完成所有请求后永远不会退出 (并且没有错误)。

我认为正在发生的事情是 WaitGroup 中的 go 例程的数量以某种方式增加太多而无法开始(通过 Add)或者减少得不够多(Done 呼叫没有发生)。

我明显做错了什么吗?我如何检查 WaitGroup 中目前有多少个 go 例程,以便我可以更好地调试正在发生的事情?

package main

import (
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "os"
    "strings"
    "sync"
)

func main() {
    links := parseLinks()

    var wg sync.WaitGroup

    for _, url := range links {
        if isExcelDocument(url) {
            wg.Add(1)
            go downloadFromURL(url, wg)
        } else {
            fmt.Printf("Skipping: %v \n", url)
        }
    }
    wg.Wait()
}

func downloadFromURL(url string, wg sync.WaitGroup) error {
    tokens := strings.Split(url, "/")
    fileName := tokens[len(tokens)-1]
    fmt.Printf("Downloading %v to %v \n", url, fileName)

    content, err := os.Create("temp_docs/" + fileName)
    if err != nil {
        fmt.Printf("Error while creating %v because of %v", fileName, err)
        return err
    }

    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("Could not fetch %v because %v", url, err)
        return err
    }
    defer resp.Body.Close()

    _, err = io.Copy(content, resp.Body)
    if err != nil {
        fmt.Printf("Error while saving %v from %v", fileName, url)
        return err
    }

    fmt.Printf("Download complete for %v \n", fileName)

    defer wg.Done()
    return nil
}

func isExcelDocument(url string) bool {
    return strings.HasSuffix(url, ".xlsx") || strings.HasSuffix(url, ".xls")
}

func parseLinks() []string {
    linksData, err := ioutil.ReadFile("links.txt")
    if err != nil {
        fmt.Printf("Trouble reading file: %v", err)
    }

    links := strings.Split(string(linksData), ", ")

    return links
}

Go 中的参数总是按值传递。当参数可能被修改时使用指针。另外,请确保您始终执行 wg.Done()。例如,

package main

import (
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "os"
    "strings"
    "sync"
)

func main() {
    links := parseLinks()

    wg := new(sync.WaitGroup)

    for _, url := range links {
        if isExcelDocument(url) {
            wg.Add(1)
            go downloadFromURL(url, wg)
        } else {
            fmt.Printf("Skipping: %v \n", url)
        }
    }
    wg.Wait()
}

func downloadFromURL(url string, wg *sync.WaitGroup) error {
    defer wg.Done()
    tokens := strings.Split(url, "/")
    fileName := tokens[len(tokens)-1]
    fmt.Printf("Downloading %v to %v \n", url, fileName)

    content, err := os.Create("temp_docs/" + fileName)
    if err != nil {
        fmt.Printf("Error while creating %v because of %v", fileName, err)
        return err
    }

    resp, err := http.Get(url)
    if err != nil {
        fmt.Printf("Could not fetch %v because %v", url, err)
        return err
    }
    defer resp.Body.Close()

    _, err = io.Copy(content, resp.Body)
    if err != nil {
        fmt.Printf("Error while saving %v from %v", fileName, url)
        return err
    }

    fmt.Printf("Download complete for %v \n", fileName)

    return nil
}

func isExcelDocument(url string) bool {
    return strings.HasSuffix(url, ".xlsx") || strings.HasSuffix(url, ".xls")
}

func parseLinks() []string {
    linksData, err := ioutil.ReadFile("links.txt")
    if err != nil {
        fmt.Printf("Trouble reading file: %v", err)
    }

    links := strings.Split(string(linksData), ", ")

    return links
}

这段代码有两个问题。首先,您必须将指向 WaitGroup 的指针传递给 downloadFromURL(),否则对象将被复制并且 Done() 将在 main().

中不可见

参见:

func main() {
    ...
    go downloadFromURL(url, &wg)
    ...
}

其次,defer wg.Done() 应该是 downloadFromURL() 中的第一个语句之一,否则如果你从该语句之前的函数中 return,它不会得到 "registered" 并且不会被调用。

func downloadFromURL(url string, wg *sync.WaitGroup) error {
    defer wg.Done()
    ...
}

如@Bartosz 所述,您需要传递对 WaitGroup 对象的引用。他在讨论 defer ws.Done()

的重要性方面做得很好

我喜欢WaitGroup的简洁。但是,我不喜欢我们需要将引用传递给 goroutine,因为这意味着并发逻辑将与您的业务逻辑混合。

所以我想出了这个通用函数来为我解决这个问题:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

所以你的例子可以这样解决:

func main() {
    links := parseLinks()

    functions := []func(){}
    for _, url := range links {
        if isExcelDocument(url) {
            function := func(url string){
                return func() { downloadFromURL(url) }
            }(url)

            functions = append(functions, function)
        } else {
            fmt.Printf("Skipping: %v \n", url)
        }
    }

    Parallelize(functions...)
}

func downloadFromURL(url string) {
    ...
}

如果你想使用它,你可以在这里找到它https://github.com/shomali11/util