sync.WaitGroup 相对于 Channels 的优势是什么?

What is the Advantage of sync.WaitGroup over Channels?

我正在开发并发 Go 库,我偶然发现了 goroutine 之间两种不同的同步模式,它们的结果相似:

Waitgroup

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func main() {
    words := []string{"foo", "bar", "baz"}

    for _, word := range words {
        wg.Add(1)
        go func(word string) {
            time.Sleep(1 * time.Second)
            defer wg.Done()
            fmt.Println(word)
        }(word)
    }
    // do concurrent things here

    // blocks/waits for waitgroup
    wg.Wait()
}

Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    words := []string{"foo", "bar", "baz"}
    done := make(chan bool)
    // defer close(done)
    for _, word := range words {
        // fmt.Println(len(done), cap(done))
        go func(word string) {
            time.Sleep(1 * time.Second)
            fmt.Println(word)
            done <- true
        }(word)
    }
    // Do concurrent things here

    // This blocks and waits for signal from channel
    for range words {
        <-done
    }
}

我被告知 sync.WaitGroup 的性能稍微好一些,而且我看到它被普遍使用。但是,我发现频道更加地道。使用 sync.WaitGroup 而不是渠道 and/or 的真正优势是什么?更好的情况可能是什么?

独立于你的第二个例子的正确性(正如评论中所解释的,你没有按照你的想法去做,但它很容易修复),我倾向于认为第一个例子更容易理解。

现在,我什至不会说频道更加地道。通道是 Go 语言的标志性功能,并不意味着只要有可能就可以习惯使用它们。 Go 中惯用的是使用最简单和最容易理解的解决方案:在这里,WaitGroup 传达了含义(您的主要功能是 Waiting for workers to be done)和机制(工作人员在 Done) 时通知。

除非你是在非常特殊的情况下,否则我不建议在这里使用通道解决方案。

如果您特别坚持只使用频道,那么它需要以不同的方式完成(如果我们使用您的示例,正​​如@Not_a_Golfer 指出的那样,它会产生不正确的结果)。

一种方法是创建一个 int 类型的通道。在工作进程中,每次完成作业时都会发送一个数字(这也可以是唯一的作业 ID,如果您愿意,可以在接收器中跟踪它)。

在接收器主程序中(它将知道提交的确切作业数)- 在通道上进行范围循环,计数直到提交的作业数未完成,并在以下时间跳出循环所有工作都已完成。如果您想跟踪每个作业的完成情况(并在需要时做一些事情),这是一个好方法。

这是供您参考的代码。递减 totalJobsLeft 是安全的,因为它只会在通道的范围循环中完成!

//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {

    comChannel := make(chan int)
    words := []string{"foo", "bar", "baz"}

    totalJobsLeft := len(words)

    //We know how many jobs are being sent

    for j, word := range words {
        jobId := j + 1
        go func(word string, jobId int) {

            fmt.Println("Job ID:", jobId, "Word:", word)
            //Do some work here, maybe call functions that you need
            //For emulating this - Sleep for a random time upto 5 seconds
            randInt := rand.Intn(5)
            //fmt.Println("Got random number", randInt)
            time.Sleep(time.Duration(randInt) * time.Second)
            comChannel <- jobId
        }(word, jobId)
    }

    for j := range comChannel {
        fmt.Println("Got job ID", j)
        totalJobsLeft--
        fmt.Println("Total jobs left", totalJobsLeft)
        if totalJobsLeft == 0 {
            break
        }
    }
    fmt.Println("Closing communication channel. All jobs completed!")
    close(comChannel)

}

这取决于用例。如果您要并行分派一次性作业 运行 而无需知道每个作业的结果,那么您可以使用 WaitGroup。但是如果你需要从 goroutines 中收集结果,那么你应该使用一个通道。

由于一个频道是双向的,我几乎总是使用一个频道。

另一方面,正如评论中所指出的,您的频道示例未正确实施。您将需要一个单独的通道来指示没有更多的工作要做(一个例子是 here)。在你的情况下,因为你提前知道单词的数量,你可以只使用一个缓冲通道并接收固定次数以避免声明关闭通道。

也建议使用 waitgroup 但你仍然想用 channel 来做,下面我提到了 channel 的简单使用

package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(chan string)
    words := []string{"foo", "bar", "baz"}

    go printWordrs(words, c)

    for j := range c {
        fmt.Println(j)
    }
}


func printWordrs(words []string, c chan string) {
    defer close(c)
    for _, word := range words {
        time.Sleep(1 * time.Second)
        c <- word
    }   
}

我经常使用通道从可能产生错误的 goroutines 中收集错误消息。这是一个简单的例子:

func couldGoWrong() (err error) {
    errorChannel := make(chan error, 3)

    // start a go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 0; c < 10; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // start another go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 10; c < 100; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // start yet another go routine
    go func() (err error) {
        defer func() { errorChannel <- err }()

        for c := 100; c < 1000; c++ {
            _, err = fmt.Println(c)
            if err != nil {
                return
            }
        }

        return
    }()

    // synchronize go routines and collect errors here
    for c := 0; c < cap(errorChannel); c++ {
        err = <-errorChannel
        if err != nil {
            return
        }
    }

    return
}

对于您的简单示例(表示作业已完成),WaitGroup 是显而易见的选择。 Go 编译器非常友善,不会责怪您使用通道来简单地发出完成任务的信号,但一些代码审查者会这样做。

  1. "WaitGroup 等待一组 goroutine 完成。 主协程调用Add(n)设置 等待的协程。然后每个协程 运行并在完成时调用 Done()。同时, 等待可用于阻塞,直到所有 goroutine 完成。"
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
    wg.Add(1)
    go func(word string) {
        defer wg.Done()
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
    }(word)
}
wg.Wait()

可能性仅受您的想象力限制:

  1. 频道可以缓冲:
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
    go func(word string) {
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
        done <- struct{}{} // not blocking
    }(word)
}
for range words {
    <-done
}
  1. 信道可以无缓冲,您可以只使用信号信道(例如chan struct{}):
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
    go func(word string) {
        time.Sleep(100 * time.Millisecond) // a job
        fmt.Println(word)
        done <- struct{}{} // blocking
    }(word)
}
for range words {
    <-done
}
  1. 您可以使用 缓冲 通道容量限制并发作业的数量:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
    wg.Add(1)
    go func(word string) {
        done <- struct{}{}
        time.Sleep(100 * time.Millisecond) // job
        fmt.Println(word, time.Since(t0))
        <-done
        wg.Done()
    }(word)
}
wg.Wait()
  1. 您可以使用频道发送消息:
done := make(chan string)
go func() {
    for _, word := range []string{"foo", "bar", "baz"} {
        done <- word
    }
    close(done)
}()
for word := range done {
    fmt.Println(word)
}

基准:

    go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8  1827517   652 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1000000  2373 ns/op  520 B/op  1 allocs/op
    go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8  1770260   678 ns/op    0 B/op  0 allocs/op
# BenchmarkEvenChannel-8    1560124  1249 ns/op  158 B/op  0 allocs/op

代码(main_test.go):

package main

import (
    "flag"
    "fmt"
    "os"
    "sync"
    "testing"
)

func BenchmarkEvenWaitgroup(b *testing.B) {
    evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
    evenChannel(b.N)
}
func evenWaitgroup(n int) {
    if n%2 == 1 { // make it even:
        n++
    }
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(n int) {
            select {
            case ch <- n: // tx if channel is empty
            case i := <-ch: // rx if channel is not empty
                // fmt.Println(n, i)
                _ = i
            }
            wg.Done()
        }(i)
    }
    wg.Wait()
}
func evenChannel(n int) {
    if n%2 == 1 { // make it even:
        n++
    }
    for i := 0; i < n; i++ {
        go func(n int) {
            select {
            case ch <- n: // tx if channel is empty
            case i := <-ch: // rx if channel is not empty
                // fmt.Println(n, i)
                _ = i
            }
            done <- struct{}{}
        }(i)
    }
    for i := 0; i < n; i++ {
        <-done
    }
}
func TestMain(m *testing.M) {
    var n int // We use TestMain to set up the done channel.
    flag.IntVar(&n, "n", 1_000_000, "chan cap")
    flag.Parse()
    done = make(chan struct{}, n)
    fmt.Println("n=", n)
    os.Exit(m.Run())
}

var (
    done chan struct{}
    ch   = make(chan int)
    wg   sync.WaitGroup
)