使用 sync/atomic 包进行同步的代码出现意外行为
Unexpected behavior in code using sync/atomic package for synchronization
下面是我在学习 Golang 中的 goroutines 时正在做的一个例子。在下面的代码中,我们生成了 30 个 goroutine,每个 goroutine 都访问一个名为 ordersProcessed 的共享变量。该示例表示收银员处理订单。一旦 ordersProcessed 超过 10,我们将打印出收银员无法再接受任何订单。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var (
wg sync.WaitGroup
ordersProcessed int64
)
// This does not work as expected
cashier := func(orderNum int) {
value := atomic.LoadInt64(&ordersProcessed)
fmt.Println("Value is ", value)
if value < 10 {
// Cashier is ready to serve!
fmt.Println("Proessing order", orderNum)
atomic.AddInt64(&ordersProcessed, 1)
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("I am tired! I want to take rest!", orderNum)
}
wg.Done()
}
for i := 0; i < 30; i++ {
wg.Add(1)
go func(orderNum int) {
// Making an order
cashier(orderNum)
}(i)
}
wg.Wait()
}
我希望看到 10 个订单的已处理消息,但此后无法处理。但是,所有 30 个订单都得到处理。我已经使用 sync/atomic 包来同步对 ordersProcessed 变量的访问,但是它的值总是被每个 goroutine 读取为 0。但是,如果我将上面的代码更改为使用如下互斥锁,它会按预期工作:
package main
import (
"fmt"
"sync"
)
func main() {
var (
wg sync.WaitGroup
ordersProcessed int64
mutex sync.Mutex
)
// This works as expected
cashier := func(orderNum int) {
mutex.Lock()
if ordersProcessed < 10 {
// Cashier is ready to serve!
fmt.Println("Processing order", orderNum)
ordersProcessed++
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("I am tired! I want to take rest!", orderNum)
}
mutex.Unlock()
wg.Done()
}
for i := 0; i < 30; i++ {
wg.Add(1)
go func(orderNum int) {
// Making an order
cashier(orderNum)
}(i)
}
wg.Wait()
}
有人可以告诉我我使用 sync/atomic 包同步访问 ordersProcessed 变量的方式有什么问题吗?
您使用了 sync/atomic 包,但您没有同步 goroutines。
当您启动 30 个 goroutine 时,每个 goroutine 都通过读取共享变量并递增它来启动。如果所有的 goroutines 读取变量,它们都将读取 0。这里的问题是你没有阻止其他 goroutines 在一个 goroutine 正在处理变量时修改变量。程序运行后,共享变量可以是 10 到 30 之间的任何值,具体取决于 goroutines 的交错方式。
你的第二个实现是正确的,它防止其他 goroutines 在其中一个正在处理共享变量时读取和修改共享变量。
下面是我在学习 Golang 中的 goroutines 时正在做的一个例子。在下面的代码中,我们生成了 30 个 goroutine,每个 goroutine 都访问一个名为 ordersProcessed 的共享变量。该示例表示收银员处理订单。一旦 ordersProcessed 超过 10,我们将打印出收银员无法再接受任何订单。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var (
wg sync.WaitGroup
ordersProcessed int64
)
// This does not work as expected
cashier := func(orderNum int) {
value := atomic.LoadInt64(&ordersProcessed)
fmt.Println("Value is ", value)
if value < 10 {
// Cashier is ready to serve!
fmt.Println("Proessing order", orderNum)
atomic.AddInt64(&ordersProcessed, 1)
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("I am tired! I want to take rest!", orderNum)
}
wg.Done()
}
for i := 0; i < 30; i++ {
wg.Add(1)
go func(orderNum int) {
// Making an order
cashier(orderNum)
}(i)
}
wg.Wait()
}
我希望看到 10 个订单的已处理消息,但此后无法处理。但是,所有 30 个订单都得到处理。我已经使用 sync/atomic 包来同步对 ordersProcessed 变量的访问,但是它的值总是被每个 goroutine 读取为 0。但是,如果我将上面的代码更改为使用如下互斥锁,它会按预期工作:
package main
import (
"fmt"
"sync"
)
func main() {
var (
wg sync.WaitGroup
ordersProcessed int64
mutex sync.Mutex
)
// This works as expected
cashier := func(orderNum int) {
mutex.Lock()
if ordersProcessed < 10 {
// Cashier is ready to serve!
fmt.Println("Processing order", orderNum)
ordersProcessed++
} else {
// Cashier has reached the max capacity of processing orders.
fmt.Println("I am tired! I want to take rest!", orderNum)
}
mutex.Unlock()
wg.Done()
}
for i := 0; i < 30; i++ {
wg.Add(1)
go func(orderNum int) {
// Making an order
cashier(orderNum)
}(i)
}
wg.Wait()
}
有人可以告诉我我使用 sync/atomic 包同步访问 ordersProcessed 变量的方式有什么问题吗?
您使用了 sync/atomic 包,但您没有同步 goroutines。
当您启动 30 个 goroutine 时,每个 goroutine 都通过读取共享变量并递增它来启动。如果所有的 goroutines 读取变量,它们都将读取 0。这里的问题是你没有阻止其他 goroutines 在一个 goroutine 正在处理变量时修改变量。程序运行后,共享变量可以是 10 到 30 之间的任何值,具体取决于 goroutines 的交错方式。
你的第二个实现是正确的,它防止其他 goroutines 在其中一个正在处理共享变量时读取和修改共享变量。