如何使用通道收集来自各种 goroutines 的响应
How to use channels to gather response from various goroutines
我是 Golang 的新手,我有一个任务是使用 WaitGroup
和 Mutex
实现的,我想将其转换为使用 Channels
。
任务的一个非常简短的描述是这样的:根据需要丢弃尽可能多的 go 例程来处理结果,并在主 go 例程中等待并收集所有结果。
我使用WaitGroup
和Mutex
的实现如下:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, result *[]int, lock *sync.Mutex) *[]int {
defer wg.Done()
defer lock.Unlock()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
lock.Lock()
*result = append(*result, input * 10)
return result
}
func main() {
var wg sync.WaitGroup
var result []int
var lock sync.Mutex
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, &result, &lock)
}
}
如何将使用 Mutex
的内存同步替换为使用 Channels
的内存同步?
我的主要问题是我不确定如何确定正在处理最终任务的最终 go 例程,因此让那个例程成为关闭 channel
的例程。这个想法是,通过关闭 channel
,主要的 go 例程可以遍历 channel
,检索结果,当它看到 channel
已关闭时,它会继续。
也可能是在这种情况下关闭通道的方法是错误的,因此我在这里问的原因。
更有经验的 go 程序员如何使用 channels
解决这个问题?
我更改了您的代码以使用该频道。还有很多其他方法可以使用该频道。
package main
import (
"fmt"
"math/rand"
"time"
)
func process(input int, out chan<- int) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
out <- input * 10
}
func main() {
var result []int
resultChan := make(chan int)
items := []int{1, 2, 3, 4, 5}
for _, v := range items {
go process(v, resultChan)
}
for i := 0; i < len(items); i++ {
res, _ := <-resultChan
result = append(result, res)
}
close(resultChan)
fmt.Println(result)
}
更新:(评论的回答)
如果物品数量未知,您需要向主程序发送信号以完成。否则 "deadlock",你可以创建一个通道来通知 main 函数完成。另外,您可以使用 sync.waiteGroup
.
对于Goroutine中的panic,可以使用defer和recover来处理错误。您可以创建一个错误通道矿石,您可以使用 x/sync/errgroup
.
解决办法太多了。这取决于你的问题。所以没有特定的方法来使用 goroutine、channel 和...
这是一个使用 WaitGroup
而不是等待固定数量的结果的解决方案。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
resultChan <- input * 10
}
func main() {
var wg sync.WaitGroup
resultChan := make(chan int)
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
var result []int
for r := range resultChan {
result = append(result, r)
}
fmt.Println(result)
}
这是一个示例片段,我在其中使用一部分通道而不是等待组来执行分叉连接:
package main
import (
"fmt"
"os"
)
type cStruct struct {
resultChan chan int
errChan chan error
}
func process(i int) (v int, err error) {
v = i
return
}
func spawn(i int) cStruct {
r := make(chan int)
e := make(chan error)
go func(i int) {
defer close(r)
defer close(e)
v, err := process(i)
if err != nil {
e <- err
return
}
r <- v
return
}(i)
return cStruct{
r,
e,
}
}
func main() {
//have a slice of channelStruct
var cStructs []cStruct
nums := []int{1, 2, 3, 4, 5}
for _, v := range nums {
cStruct := spawn(v)
cStructs = append(cStructs, cStruct)
}
//All the routines have been spawned, now iterate over the slice:
var results []int
for _, c := range cStructs {
rChan, errChan := c.resultChan, c.errChan
select {
case r := <-rChan:
{
results = append(results, r)
}
case err := <-errChan:
{
if err != nil {
os.Exit(1)
return
}
}
}
}
//All the work should be done by now, iterating over the results
for _, result := range results {
fmt.Println("Aggregated result:", result)
}
}
我是 Golang 的新手,我有一个任务是使用 WaitGroup
和 Mutex
实现的,我想将其转换为使用 Channels
。
任务的一个非常简短的描述是这样的:根据需要丢弃尽可能多的 go 例程来处理结果,并在主 go 例程中等待并收集所有结果。
我使用WaitGroup
和Mutex
的实现如下:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, result *[]int, lock *sync.Mutex) *[]int {
defer wg.Done()
defer lock.Unlock()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
lock.Lock()
*result = append(*result, input * 10)
return result
}
func main() {
var wg sync.WaitGroup
var result []int
var lock sync.Mutex
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, &result, &lock)
}
}
如何将使用 Mutex
的内存同步替换为使用 Channels
的内存同步?
我的主要问题是我不确定如何确定正在处理最终任务的最终 go 例程,因此让那个例程成为关闭 channel
的例程。这个想法是,通过关闭 channel
,主要的 go 例程可以遍历 channel
,检索结果,当它看到 channel
已关闭时,它会继续。
也可能是在这种情况下关闭通道的方法是错误的,因此我在这里问的原因。
更有经验的 go 程序员如何使用 channels
解决这个问题?
我更改了您的代码以使用该频道。还有很多其他方法可以使用该频道。
package main
import (
"fmt"
"math/rand"
"time"
)
func process(input int, out chan<- int) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
out <- input * 10
}
func main() {
var result []int
resultChan := make(chan int)
items := []int{1, 2, 3, 4, 5}
for _, v := range items {
go process(v, resultChan)
}
for i := 0; i < len(items); i++ {
res, _ := <-resultChan
result = append(result, res)
}
close(resultChan)
fmt.Println(result)
}
更新:(评论的回答)
如果物品数量未知,您需要向主程序发送信号以完成。否则 "deadlock",你可以创建一个通道来通知 main 函数完成。另外,您可以使用 sync.waiteGroup
.
对于Goroutine中的panic,可以使用defer和recover来处理错误。您可以创建一个错误通道矿石,您可以使用 x/sync/errgroup
.
解决办法太多了。这取决于你的问题。所以没有特定的方法来使用 goroutine、channel 和...
这是一个使用 WaitGroup
而不是等待固定数量的结果的解决方案。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func process(input int, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(5)
time.Sleep(time.Duration(n) * time.Second)
resultChan <- input * 10
}
func main() {
var wg sync.WaitGroup
resultChan := make(chan int)
for i := range []int{1,2,3,4,5} {
wg.Add(1)
go process(i, &wg, resultChan)
}
go func() {
wg.Wait()
close(resultChan)
}()
var result []int
for r := range resultChan {
result = append(result, r)
}
fmt.Println(result)
}
这是一个示例片段,我在其中使用一部分通道而不是等待组来执行分叉连接:
package main
import (
"fmt"
"os"
)
type cStruct struct {
resultChan chan int
errChan chan error
}
func process(i int) (v int, err error) {
v = i
return
}
func spawn(i int) cStruct {
r := make(chan int)
e := make(chan error)
go func(i int) {
defer close(r)
defer close(e)
v, err := process(i)
if err != nil {
e <- err
return
}
r <- v
return
}(i)
return cStruct{
r,
e,
}
}
func main() {
//have a slice of channelStruct
var cStructs []cStruct
nums := []int{1, 2, 3, 4, 5}
for _, v := range nums {
cStruct := spawn(v)
cStructs = append(cStructs, cStruct)
}
//All the routines have been spawned, now iterate over the slice:
var results []int
for _, c := range cStructs {
rChan, errChan := c.resultChan, c.errChan
select {
case r := <-rChan:
{
results = append(results, r)
}
case err := <-errChan:
{
if err != nil {
os.Exit(1)
return
}
}
}
}
//All the work should be done by now, iterating over the results
for _, result := range results {
fmt.Println("Aggregated result:", result)
}
}