关闭和发送到通道之间的竞争条件
Race condition between close and send to channel
我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。你看,管道的工作是从输入通道接收数据,处理它,并将结果输出到通道。这是它的预期行为:
- 从输入通道接收数据。
- 将数据委托给可用的工作人员。
- worker将结果发送到输出通道。
- 所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
但是,运行 它可能会在不处理所有输入的情况下退出,或者会出现 send on closed channel
消息并出现恐慌。使用 -race
标志构建源代码会在 close(out)
和 out <- res
之间发出数据竞争警告。
以下是我认为可能会发生的情况。一旦一些工人完成了他们的工作,wg
的计数器会在一瞬间归零。因此,wg.Wait()
完成,程序继续进行 close(out)
。同时,作业通道尚未完成数据生成,这意味着一些工作人员仍在 运行 另一个 goroutine 中。由于 out
通道已经关闭,因此导致恐慌。
等待组应该放在别的地方吗?或者有没有更好的方法等待所有worker完成?
作业的完成速度可能与发送速度一样快。在这种情况下,即使有更多项目要处理,WaitGroup 也会在零附近浮动。
解决此问题的一个方法是在发送作业之前添加一个,并在发送所有作业后减少一个,有效地将发件人视为 'jobs' 之一。在这种情况下,我们最好在发件人中执行 wg.Add
:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
wg.Add(1)
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
}
我在代码中注意到的一件事是为每个作业启动了一个 goroutine。同时每个作业循环处理 jobs
通道,直到 empty/closed。似乎没有必要两者都做。
不清楚为什么每个工作需要一个工人,但如果你这样做,你可以重组你的外循环设置(见下面未经测试的代码)。这种类型首先消除了对工作池的需求。
尽管如此,在 分拆任何工人之前,请始终wg.Add
。就在这里,您恰好分拆了 100 名工人:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
因此您可以这样做:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
wg.Add(100) // ADDED - count the 100 workers
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
请注意,您现在可以将 wg
本身向下移动到 goroutine 中,该 goroutine 会分离出 worker。如果您放弃让每个工作人员将工作分拆为新的 goroutines 的想法,这可以使事情变得更干净。但是如果每个 worker 都打算分拆出另一个 goroutine,那个 worker 本身也必须使用 wg.Add
,像这样:
for j := range jobs {
wg.Add(1) // ADDED - count the spun-off goroutines
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done() // MOVED (for illustration only, can defer as before)
}(j)
}
wg.Done() // ADDED - our work in `p.work` is now done
也就是说,每个匿名函数都是通道的另一个用户,因此在创建新的 goroutine 之前增加通道用户数 (wg.Add(1)
)。当您读完输入通道 jobs
后,调用 wg.Done()
(可能通过较早的 defer
,但我在此处的末尾显示了它)。
思考这个问题的关键是wg
统计可以的活动goroutine的数量,此时,写入通道.只有当 no goroutines 打算再写时它才会变为零。 这样可以安全地关闭通道。
考虑使用更简单的(但未经测试):
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
var wg sync.WaitGroup
go func() {
defer close(out)
for j := range in {
wg.Add(1)
go func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
wg.Wait()
}()
return out
}
你现在有一个 goroutine 正在尽可能快地读取 in
通道,并在它运行时分拆作业。每个传入的工作你都会得到一个 goroutine,除非他们提前完成工作。没有池,每个工作只有一个工人(与您的代码相同,只是我们淘汰了没有做任何有用的池)。
或者,因为只有一定数量的 CPU 可用,像开始时一样分拆一些 goroutine,但是每个 goroutine 运行 一个 作业完成,并交付结果,然后返回阅读下一个作业:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
go func() {
defer close(out)
var wg sync.WaitGroup
ncpu := runtime.NumCPU() // or something fancier if you like
wg.Add(ncpu)
for i := 0; i < ncpu; i++ {
go func() {
defer wg.Done()
for j := range in {
out <- doSomethingWith(j)
}
}()
}
wg.Wait()
}
return out
}
通过使用 runtime.NumCPU()
,我们只获得与 运行 作业的 CPU 一样多的工人读取作业。那些是游泳池,他们一次只做一项工作。
如果输出通道读取器结构良好(即不会导致管道便秘),通常无需缓冲输出通道。如果不是,这里的缓冲深度会限制您可以 "work ahead" 处理结果的人的工作数量。根据这样做的有用程度来设置它 "working ahead"——不一定是 CPU 的数量,或预期的作业数量,或其他任何东西。
我正在尝试使用工作池构建通用管道库。我为源、管道和接收器创建了一个接口。你看,管道的工作是从输入通道接收数据,处理它,并将结果输出到通道。这是它的预期行为:
- 从输入通道接收数据。
- 将数据委托给可用的工作人员。
- worker将结果发送到输出通道。
- 所有工作人员完成后关闭输出通道。
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
defer wg.Done()
wg.Add(1)
res := doSomethingWith(j)
out <- res
}(j)
}
}
但是,运行 它可能会在不处理所有输入的情况下退出,或者会出现 send on closed channel
消息并出现恐慌。使用 -race
标志构建源代码会在 close(out)
和 out <- res
之间发出数据竞争警告。
以下是我认为可能会发生的情况。一旦一些工人完成了他们的工作,wg
的计数器会在一瞬间归零。因此,wg.Wait()
完成,程序继续进行 close(out)
。同时,作业通道尚未完成数据生成,这意味着一些工作人员仍在 运行 另一个 goroutine 中。由于 out
通道已经关闭,因此导致恐慌。
等待组应该放在别的地方吗?或者有没有更好的方法等待所有worker完成?
作业的完成速度可能与发送速度一样快。在这种情况下,即使有更多项目要处理,WaitGroup 也会在零附近浮动。
解决此问题的一个方法是在发送作业之前添加一个,并在发送所有作业后减少一个,有效地将发件人视为 'jobs' 之一。在这种情况下,我们最好在发件人中执行 wg.Add
:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
wg.Add(1)
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
return
}
func (p *pipe) work(jobs <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
for j := range jobs {
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
}
我在代码中注意到的一件事是为每个作业启动了一个 goroutine。同时每个作业循环处理 jobs
通道,直到 empty/closed。似乎没有必要两者都做。
不清楚为什么每个工作需要一个工人,但如果你这样做,你可以重组你的外循环设置(见下面未经测试的代码)。这种类型首先消除了对工作池的需求。
尽管如此,在 分拆任何工人之前,请始终wg.Add
。就在这里,您恰好分拆了 100 名工人:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
因此您可以这样做:
var wg sync.WaitGroup
out = make(chan interface{}, 100)
go func() {
wg.Add(100) // ADDED - count the 100 workers
for i := 1; i <= 100; i++ {
go p.work(in, out, &wg)
}
wg.Wait()
close(out)
}()
请注意,您现在可以将 wg
本身向下移动到 goroutine 中,该 goroutine 会分离出 worker。如果您放弃让每个工作人员将工作分拆为新的 goroutines 的想法,这可以使事情变得更干净。但是如果每个 worker 都打算分拆出另一个 goroutine,那个 worker 本身也必须使用 wg.Add
,像这样:
for j := range jobs {
wg.Add(1) // ADDED - count the spun-off goroutines
func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done() // MOVED (for illustration only, can defer as before)
}(j)
}
wg.Done() // ADDED - our work in `p.work` is now done
也就是说,每个匿名函数都是通道的另一个用户,因此在创建新的 goroutine 之前增加通道用户数 (wg.Add(1)
)。当您读完输入通道 jobs
后,调用 wg.Done()
(可能通过较早的 defer
,但我在此处的末尾显示了它)。
思考这个问题的关键是wg
统计可以的活动goroutine的数量,此时,写入通道.只有当 no goroutines 打算再写时它才会变为零。 这样可以安全地关闭通道。
考虑使用更简单的(但未经测试):
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
var wg sync.WaitGroup
go func() {
defer close(out)
for j := range in {
wg.Add(1)
go func(j Job) {
res := doSomethingWith(j)
out <- res
wg.Done()
}(j)
}
wg.Wait()
}()
return out
}
你现在有一个 goroutine 正在尽可能快地读取 in
通道,并在它运行时分拆作业。每个传入的工作你都会得到一个 goroutine,除非他们提前完成工作。没有池,每个工作只有一个工人(与您的代码相同,只是我们淘汰了没有做任何有用的池)。
或者,因为只有一定数量的 CPU 可用,像开始时一样分拆一些 goroutine,但是每个 goroutine 运行 一个 作业完成,并交付结果,然后返回阅读下一个作业:
func (p *pipe) Process(in chan interface{}) (out chan interface{}) {
out = make(chan interface{})
go func() {
defer close(out)
var wg sync.WaitGroup
ncpu := runtime.NumCPU() // or something fancier if you like
wg.Add(ncpu)
for i := 0; i < ncpu; i++ {
go func() {
defer wg.Done()
for j := range in {
out <- doSomethingWith(j)
}
}()
}
wg.Wait()
}
return out
}
通过使用 runtime.NumCPU()
,我们只获得与 运行 作业的 CPU 一样多的工人读取作业。那些是游泳池,他们一次只做一项工作。
如果输出通道读取器结构良好(即不会导致管道便秘),通常无需缓冲输出通道。如果不是,这里的缓冲深度会限制您可以 "work ahead" 处理结果的人的工作数量。根据这样做的有用程度来设置它 "working ahead"——不一定是 CPU 的数量,或预期的作业数量,或其他任何东西。