如何使用通道收集来自各种 goroutines 的响应

How to use channels to gather response from various goroutines

我是 Golang 的新手,我有一个任务是使用 WaitGroupMutex 实现的,我想将其转换为使用 Channels

任务的一个非常简短的描述是这样的:根据需要丢弃尽可能多的 go 例程来处理结果,并在主 go 例程中等待并收集所有结果。

我使用WaitGroupMutex的实现如下:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func process(input int, wg *sync.WaitGroup, result *[]int, lock *sync.Mutex) *[]int {
    defer wg.Done()
    defer lock.Unlock()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    lock.Lock()
    *result = append(*result, input * 10)

    return result
}

func main() {

    var wg sync.WaitGroup
    var result []int
    var lock sync.Mutex
    for i := range []int{1,2,3,4,5} {
        wg.Add(1)
        go process(i, &wg, &result, &lock)
    }
}

如何将使用 Mutex 的内存同步替换为使用 Channels 的内存同步?

我的主要问题是我不确定如何确定正在处理最终任务的最终 go 例程,因此让那个例程成为关闭 channel 的例程。这个想法是,通过关闭 channel,主要的 go 例程可以遍历 channel,检索结果,当它看到 channel 已关闭时,它会继续。

也可能是在这种情况下关闭通道的方法是错误的,因此我在这里问的原因。

更有经验的 go 程序员如何使用 channels 解决这个问题?

我更改了您的代码以使用该频道。还有很多其他方法可以使用该频道。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func process(input int, out chan<- int) {
    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    out <- input * 10
}

func main() {
    var result []int
    resultChan := make(chan int)
    items := []int{1, 2, 3, 4, 5}

    for _, v := range items {
        go process(v, resultChan)
    }

    for i := 0; i < len(items); i++ {
        res, _ := <-resultChan
        result = append(result, res)
    }

    close(resultChan)
    fmt.Println(result)
}

更新:(评论的回答)

如果物品数量未知,您需要向主程序发送信号以完成。否则 "deadlock",你可以创建一个通道来通知 main 函数完成。另外,您可以使用 sync.waiteGroup.

对于Goroutine中的panic,可以使用defer和recover来处理错误。您可以创建一个错误通道矿石,您可以使用 x/sync/errgroup.

解决办法太多了。这取决于你的问题。所以没有特定的方法来使用 goroutine、channel 和...

这是一个使用 WaitGroup 而不是等待固定数量的结果的解决方案。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func process(input int, wg *sync.WaitGroup, resultChan chan<- int) {
    defer wg.Done()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)

    resultChan <- input * 10
}

func main() {
    var wg sync.WaitGroup

    resultChan := make(chan int)

    for i := range []int{1,2,3,4,5} {
        wg.Add(1)
        go process(i, &wg, resultChan)
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    var result []int
    for r := range resultChan {
        result = append(result, r)
    }

    fmt.Println(result)
}

这是一个示例片段,我在其中使用一部分通道而不是等待组来执行分叉连接:

package main

import (
    "fmt"
    "os"
)

type cStruct struct {
    resultChan chan int
    errChan    chan error
}

func process(i int) (v int, err error) {
    v = i
    return
}

func spawn(i int) cStruct {
    r := make(chan int)
    e := make(chan error)
    go func(i int) {
        defer close(r)
        defer close(e)
        v, err := process(i)
        if err != nil {
            e <- err
            return
        }
        r <- v
        return
    }(i)
    return cStruct{
        r,
        e,
    }
}

func main() {
    //have a slice of channelStruct
    var cStructs []cStruct
    nums := []int{1, 2, 3, 4, 5}
    for _, v := range nums {
        cStruct := spawn(v)
        cStructs = append(cStructs, cStruct)
    }
    //All the routines have been spawned, now iterate over the slice:
    var results []int
    for _, c := range cStructs {
        rChan, errChan := c.resultChan, c.errChan
        select {
        case r := <-rChan:
            {
                results = append(results, r)
            }
        case err := <-errChan:
            {
                if err != nil {
                    os.Exit(1)
                    return
                }
            }
        }

    }
    //All the work should be done by now, iterating over the results
    for _, result := range results {
        fmt.Println("Aggregated result:", result)
    }
}