sync.WaitGroup 相对于 Channels 的优势是什么?
What is the Advantage of sync.WaitGroup over Channels?
我正在开发并发 Go 库,我偶然发现了 goroutine 之间两种不同的同步模式,它们的结果相似:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
words := []string{"foo", "bar", "baz"}
for _, word := range words {
wg.Add(1)
go func(word string) {
time.Sleep(1 * time.Second)
defer wg.Done()
fmt.Println(word)
}(word)
}
// do concurrent things here
// blocks/waits for waitgroup
wg.Wait()
}
package main
import (
"fmt"
"time"
)
func main() {
words := []string{"foo", "bar", "baz"}
done := make(chan bool)
// defer close(done)
for _, word := range words {
// fmt.Println(len(done), cap(done))
go func(word string) {
time.Sleep(1 * time.Second)
fmt.Println(word)
done <- true
}(word)
}
// Do concurrent things here
// This blocks and waits for signal from channel
for range words {
<-done
}
}
我被告知 sync.WaitGroup
的性能稍微好一些,而且我看到它被普遍使用。但是,我发现频道更加地道。使用 sync.WaitGroup
而不是渠道 and/or 的真正优势是什么?更好的情况可能是什么?
独立于你的第二个例子的正确性(正如评论中所解释的,你没有按照你的想法去做,但它很容易修复),我倾向于认为第一个例子更容易理解。
现在,我什至不会说频道更加地道。通道是 Go 语言的标志性功能,并不意味着只要有可能就可以习惯使用它们。 Go 中惯用的是使用最简单和最容易理解的解决方案:在这里,WaitGroup
传达了含义(您的主要功能是 Wait
ing for workers to be done)和机制(工作人员在 Done
) 时通知。
除非你是在非常特殊的情况下,否则我不建议在这里使用通道解决方案。
如果您特别坚持只使用频道,那么它需要以不同的方式完成(如果我们使用您的示例,正如@Not_a_Golfer 指出的那样,它会产生不正确的结果)。
一种方法是创建一个 int 类型的通道。在工作进程中,每次完成作业时都会发送一个数字(这也可以是唯一的作业 ID,如果您愿意,可以在接收器中跟踪它)。
在接收器主程序中(它将知道提交的确切作业数)- 在通道上进行范围循环,计数直到提交的作业数未完成,并在以下时间跳出循环所有工作都已完成。如果您想跟踪每个作业的完成情况(并在需要时做一些事情),这是一个好方法。
这是供您参考的代码。递减 totalJobsLeft 是安全的,因为它只会在通道的范围循环中完成!
//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
comChannel := make(chan int)
words := []string{"foo", "bar", "baz"}
totalJobsLeft := len(words)
//We know how many jobs are being sent
for j, word := range words {
jobId := j + 1
go func(word string, jobId int) {
fmt.Println("Job ID:", jobId, "Word:", word)
//Do some work here, maybe call functions that you need
//For emulating this - Sleep for a random time upto 5 seconds
randInt := rand.Intn(5)
//fmt.Println("Got random number", randInt)
time.Sleep(time.Duration(randInt) * time.Second)
comChannel <- jobId
}(word, jobId)
}
for j := range comChannel {
fmt.Println("Got job ID", j)
totalJobsLeft--
fmt.Println("Total jobs left", totalJobsLeft)
if totalJobsLeft == 0 {
break
}
}
fmt.Println("Closing communication channel. All jobs completed!")
close(comChannel)
}
这取决于用例。如果您要并行分派一次性作业 运行 而无需知道每个作业的结果,那么您可以使用 WaitGroup
。但是如果你需要从 goroutines 中收集结果,那么你应该使用一个通道。
由于一个频道是双向的,我几乎总是使用一个频道。
另一方面,正如评论中所指出的,您的频道示例未正确实施。您将需要一个单独的通道来指示没有更多的工作要做(一个例子是 here)。在你的情况下,因为你提前知道单词的数量,你可以只使用一个缓冲通道并接收固定次数以避免声明关闭通道。
也建议使用 waitgroup 但你仍然想用 channel 来做,下面我提到了 channel 的简单使用
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
words := []string{"foo", "bar", "baz"}
go printWordrs(words, c)
for j := range c {
fmt.Println(j)
}
}
func printWordrs(words []string, c chan string) {
defer close(c)
for _, word := range words {
time.Sleep(1 * time.Second)
c <- word
}
}
我经常使用通道从可能产生错误的 goroutines 中收集错误消息。这是一个简单的例子:
func couldGoWrong() (err error) {
errorChannel := make(chan error, 3)
// start a go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 0; c < 10; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 10; c < 100; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start yet another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 100; c < 1000; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// synchronize go routines and collect errors here
for c := 0; c < cap(errorChannel); c++ {
err = <-errorChannel
if err != nil {
return
}
}
return
}
对于您的简单示例(表示作业已完成),WaitGroup
是显而易见的选择。 Go 编译器非常友善,不会责怪您使用通道来简单地发出完成任务的信号,但一些代码审查者会这样做。
- "WaitGroup 等待一组 goroutine 完成。
主协程调用
Add(n)
设置
等待的协程。然后每个协程
运行并在完成时调用 Done()
。同时,
等待可用于阻塞,直到所有 goroutine 完成。"
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
wg.Add(1)
go func(word string) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
}(word)
}
wg.Wait()
可能性仅受您的想象力限制:
- 频道可以缓冲:
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // not blocking
}(word)
}
for range words {
<-done
}
- 信道可以无缓冲,您可以只使用信号信道(例如
chan struct{}
):
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // blocking
}(word)
}
for range words {
<-done
}
- 您可以使用 缓冲 通道容量限制并发作业的数量:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
wg.Add(1)
go func(word string) {
done <- struct{}{}
time.Sleep(100 * time.Millisecond) // job
fmt.Println(word, time.Since(t0))
<-done
wg.Done()
}(word)
}
wg.Wait()
- 您可以使用频道发送消息:
done := make(chan string)
go func() {
for _, word := range []string{"foo", "bar", "baz"} {
done <- word
}
close(done)
}()
for word := range done {
fmt.Println(word)
}
基准:
go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8 1827517 652 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1000000 2373 ns/op 520 B/op 1 allocs/op
go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8 1770260 678 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1560124 1249 ns/op 158 B/op 0 allocs/op
代码(main_test.go
):
package main
import (
"flag"
"fmt"
"os"
"sync"
"testing"
)
func BenchmarkEvenWaitgroup(b *testing.B) {
evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
evenChannel(b.N)
}
func evenWaitgroup(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
wg.Done()
}(i)
}
wg.Wait()
}
func evenChannel(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
done <- struct{}{}
}(i)
}
for i := 0; i < n; i++ {
<-done
}
}
func TestMain(m *testing.M) {
var n int // We use TestMain to set up the done channel.
flag.IntVar(&n, "n", 1_000_000, "chan cap")
flag.Parse()
done = make(chan struct{}, n)
fmt.Println("n=", n)
os.Exit(m.Run())
}
var (
done chan struct{}
ch = make(chan int)
wg sync.WaitGroup
)
我正在开发并发 Go 库,我偶然发现了 goroutine 之间两种不同的同步模式,它们的结果相似:
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
words := []string{"foo", "bar", "baz"}
for _, word := range words {
wg.Add(1)
go func(word string) {
time.Sleep(1 * time.Second)
defer wg.Done()
fmt.Println(word)
}(word)
}
// do concurrent things here
// blocks/waits for waitgroup
wg.Wait()
}
package main
import (
"fmt"
"time"
)
func main() {
words := []string{"foo", "bar", "baz"}
done := make(chan bool)
// defer close(done)
for _, word := range words {
// fmt.Println(len(done), cap(done))
go func(word string) {
time.Sleep(1 * time.Second)
fmt.Println(word)
done <- true
}(word)
}
// Do concurrent things here
// This blocks and waits for signal from channel
for range words {
<-done
}
}
我被告知 sync.WaitGroup
的性能稍微好一些,而且我看到它被普遍使用。但是,我发现频道更加地道。使用 sync.WaitGroup
而不是渠道 and/or 的真正优势是什么?更好的情况可能是什么?
独立于你的第二个例子的正确性(正如评论中所解释的,你没有按照你的想法去做,但它很容易修复),我倾向于认为第一个例子更容易理解。
现在,我什至不会说频道更加地道。通道是 Go 语言的标志性功能,并不意味着只要有可能就可以习惯使用它们。 Go 中惯用的是使用最简单和最容易理解的解决方案:在这里,WaitGroup
传达了含义(您的主要功能是 Wait
ing for workers to be done)和机制(工作人员在 Done
) 时通知。
除非你是在非常特殊的情况下,否则我不建议在这里使用通道解决方案。
如果您特别坚持只使用频道,那么它需要以不同的方式完成(如果我们使用您的示例,正如@Not_a_Golfer 指出的那样,它会产生不正确的结果)。
一种方法是创建一个 int 类型的通道。在工作进程中,每次完成作业时都会发送一个数字(这也可以是唯一的作业 ID,如果您愿意,可以在接收器中跟踪它)。
在接收器主程序中(它将知道提交的确切作业数)- 在通道上进行范围循环,计数直到提交的作业数未完成,并在以下时间跳出循环所有工作都已完成。如果您想跟踪每个作业的完成情况(并在需要时做一些事情),这是一个好方法。
这是供您参考的代码。递减 totalJobsLeft 是安全的,因为它只会在通道的范围循环中完成!
//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
comChannel := make(chan int)
words := []string{"foo", "bar", "baz"}
totalJobsLeft := len(words)
//We know how many jobs are being sent
for j, word := range words {
jobId := j + 1
go func(word string, jobId int) {
fmt.Println("Job ID:", jobId, "Word:", word)
//Do some work here, maybe call functions that you need
//For emulating this - Sleep for a random time upto 5 seconds
randInt := rand.Intn(5)
//fmt.Println("Got random number", randInt)
time.Sleep(time.Duration(randInt) * time.Second)
comChannel <- jobId
}(word, jobId)
}
for j := range comChannel {
fmt.Println("Got job ID", j)
totalJobsLeft--
fmt.Println("Total jobs left", totalJobsLeft)
if totalJobsLeft == 0 {
break
}
}
fmt.Println("Closing communication channel. All jobs completed!")
close(comChannel)
}
这取决于用例。如果您要并行分派一次性作业 运行 而无需知道每个作业的结果,那么您可以使用 WaitGroup
。但是如果你需要从 goroutines 中收集结果,那么你应该使用一个通道。
由于一个频道是双向的,我几乎总是使用一个频道。
另一方面,正如评论中所指出的,您的频道示例未正确实施。您将需要一个单独的通道来指示没有更多的工作要做(一个例子是 here)。在你的情况下,因为你提前知道单词的数量,你可以只使用一个缓冲通道并接收固定次数以避免声明关闭通道。
也建议使用 waitgroup 但你仍然想用 channel 来做,下面我提到了 channel 的简单使用
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
words := []string{"foo", "bar", "baz"}
go printWordrs(words, c)
for j := range c {
fmt.Println(j)
}
}
func printWordrs(words []string, c chan string) {
defer close(c)
for _, word := range words {
time.Sleep(1 * time.Second)
c <- word
}
}
我经常使用通道从可能产生错误的 goroutines 中收集错误消息。这是一个简单的例子:
func couldGoWrong() (err error) {
errorChannel := make(chan error, 3)
// start a go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 0; c < 10; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 10; c < 100; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start yet another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 100; c < 1000; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// synchronize go routines and collect errors here
for c := 0; c < cap(errorChannel); c++ {
err = <-errorChannel
if err != nil {
return
}
}
return
}
对于您的简单示例(表示作业已完成),WaitGroup
是显而易见的选择。 Go 编译器非常友善,不会责怪您使用通道来简单地发出完成任务的信号,但一些代码审查者会这样做。
- "WaitGroup 等待一组 goroutine 完成。
主协程调用
Add(n)
设置 等待的协程。然后每个协程 运行并在完成时调用Done()
。同时, 等待可用于阻塞,直到所有 goroutine 完成。"
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
wg.Add(1)
go func(word string) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
}(word)
}
wg.Wait()
可能性仅受您的想象力限制:
- 频道可以缓冲:
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // not blocking
}(word)
}
for range words {
<-done
}
- 信道可以无缓冲,您可以只使用信号信道(例如
chan struct{}
):
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // blocking
}(word)
}
for range words {
<-done
}
- 您可以使用 缓冲 通道容量限制并发作业的数量:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
wg.Add(1)
go func(word string) {
done <- struct{}{}
time.Sleep(100 * time.Millisecond) // job
fmt.Println(word, time.Since(t0))
<-done
wg.Done()
}(word)
}
wg.Wait()
- 您可以使用频道发送消息:
done := make(chan string)
go func() {
for _, word := range []string{"foo", "bar", "baz"} {
done <- word
}
close(done)
}()
for word := range done {
fmt.Println(word)
}
基准:
go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8 1827517 652 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1000000 2373 ns/op 520 B/op 1 allocs/op
go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8 1770260 678 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1560124 1249 ns/op 158 B/op 0 allocs/op
代码(main_test.go
):
package main
import (
"flag"
"fmt"
"os"
"sync"
"testing"
)
func BenchmarkEvenWaitgroup(b *testing.B) {
evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
evenChannel(b.N)
}
func evenWaitgroup(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
wg.Done()
}(i)
}
wg.Wait()
}
func evenChannel(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
done <- struct{}{}
}(i)
}
for i := 0; i < n; i++ {
<-done
}
}
func TestMain(m *testing.M) {
var n int // We use TestMain to set up the done channel.
flag.IntVar(&n, "n", 1_000_000, "chan cap")
flag.Parse()
done = make(chan struct{}, n)
fmt.Println("n=", n)
os.Exit(m.Run())
}
var (
done chan struct{}
ch = make(chan int)
wg sync.WaitGroup
)