如何从按特定顺序执行的 N 个 goroutine 中收集值?

How to collect values from N goroutines executed in a specific order?

下面是 Stuff 类型的结构。它有三个整数。一个 Number,它的 Double 和它的 Power。让我们假设计算给定整数列表的双精度和幂是一项昂贵的计算。

type Stuff struct {
    Number int
    Double int
    Power  int
}

func main() {
    nums := []int{2, 3, 4} // given numbers
    stuff := []Stuff{}     // struct of stuff with transformed ints

    double := make(chan int)
    power := make(chan int)

    for _, i := range nums {
        go doubleNumber(i, double)
        go powerNumber(i, power)
    }

    // How do I get the values back in the right order?

    fmt.Println(stuff)
}

func doubleNumber(i int, c chan int) {
    c <- i + i
}

func powerNumber(i int, c chan int) {
    c <- i * i
}

fmt.Println(stuff) 的结果应该与初始化的结果相同:

stuff := []Stuff{
    {Number: 2, Double: 4, Power: 4}
    {Number: 3, Double: 6, Power: 9}
    {Number: 4, Double: 8, Power: 16}
}

我知道我可以使用 <- double<- power 从通道中收集值,但我不知道什么双倍/幂属于什么数字。

Goroutines 运行 并发、独立,因此如果没有显式同步,您无法预测执行和完成顺序。因此,您无法将 returned 数字与输入数字配对。

您可以 return 更多数据(例如输入数字和输出,例如包装在结构中),或将指针传递给工作函数(作为新的 goroutines 启动),例如*Stuff 并让 goroutines 将计算数据填充到 Stuff 本身。

返回更多数据

我将使用频道类型 chan Pair,其中 Pair 是:

type Pair struct{ Number, Result int }

因此计算将如下所示:

func doubleNumber(i int, c chan Pair) { c <- Pair{i, i + i} }

func powerNumber(i int, c chan Pair) { c <- Pair{i, i * i} }

我会使用 map[int]*Stuff 因为可收集的数据来自多个渠道(doublepower),我想轻松找到合适的 Stuff快速(需要指针,因此我也可以“在地图中”修改它)。

所以主要功能:

nums := []int{2, 3, 4} // given numbers
stuffs := map[int]*Stuff{}

double := make(chan Pair)
power := make(chan Pair)

for _, i := range nums {
    go doubleNumber(i, double)
    go powerNumber(i, power)
}

// How do I get the values back in the right order?
for i := 0; i < len(nums)*2; i++ {
    getStuff := func(number int) *Stuff {
        s := stuffs[number]
        if s == nil {
            s = &Stuff{Number: number}
            stuffs[number] = s
        }
        return s
    }

    select {
    case p := <-double:
        getStuff(p.Number).Double = p.Result
    case p := <-power:
        getStuff(p.Number).Power = p.Result
    }
}

for _, v := range nums {
    fmt.Printf("%+v\n", stuffs[v])
}

输出(在 Go Playground 上尝试):

&{Number:2 Double:4 Power:4}
&{Number:3 Double:6 Power:9}
&{Number:4 Double:8 Power:16}

使用指针

由于现在我们正在传递 *Stuff 值,我们可以在 Stuff 本身中“预填充”输入数字。

但必须注意,您只能 read/write 正确同步的值。最简单的方法是等待所有“工人”goroutines 完成他们的工作。

var wg = &sync.WaitGroup{}

func main() {
    nums := []int{2, 3, 4} // given numbers

    stuffs := make([]Stuff, len(nums))
    for i, n := range nums {
        stuffs[i].Number = n
        wg.Add(2)
        go doubleNumber(&stuffs[i])
        go powerNumber(&stuffs[i])
    }
    wg.Wait()
    fmt.Printf("%+v", stuffs)
}

func doubleNumber(s *Stuff) {
    defer wg.Done()
    s.Double = s.Number + s.Number
}

func powerNumber(s *Stuff) {
    defer wg.Done()
    s.Power = s.Number * s.Number
}

输出(在 Go Playground 上尝试):

[{Number:2 Double:4 Power:4} {Number:3 Double:6 Power:9} {Number:4 Double:8 Power:16}]

同时写入不同的切片元素

另请注意,由于您可以同时写入不同的数组或切片元素(有关详细信息,请参阅 ), you can write the results directly in a slice without channels. See 如何完成此操作。

就个人而言,我会使用 chan Stuff 将结果传回,然后启动 goroutines 计算完整的 Stuff 并将其传回。如果您需要同时计算单个 Stuff 的各个部分,您可以使用专用通道从每个 goroutine 生成 goroutine。收集完所有结果后,您就可以(可选)使用累积值对切片进行排序。

下面是我的意思的例子(原则上你可以使用 sync.WaitGroup 来协调事情,但如果输入计数已知,严格来说你不需要它)。

type Stuff struct {
  number int64
  double int64
  square int64
}

// Compute a Stuff with individual computations in-line, send it out
func computeStuff(n int64, out chan<- Stuff) {
   rv := Stuff{number: n}
   rv.double = n * 2
   rv.square = n * n
   out <- rv
}

// Compute a Stuff with individual computations concurrent
func computeStuffConcurrent(n int64, out chan<- Stuff) {
  rv := Stuff{number: n}
  dc := make(chan int64)
  sc := make(chan int64)
  defer close(dc)
  defer close(sc)
  go double(n, dc)
  go square(n, sc)
  rv.double = <-dc
  rv.square = <-sc
  out <- rv
}

func double(n int64, result chan<- int) {
   result <- n * 2
}

func square(n int64, result chan<- int) {
  result <- n * n
}

func main() {
  inputs := []int64{1, 2, 3}
  results := []Stuff{}
  resultChannel := make(chan Stuff)

  for _, input := range inputs {
    go computeStuff(input, resultChannel) 
    // Or the concurrent version, if the extra performance is needed
  }

  for c := 0; c < len(inputs); c++ {
    results = append(results, <- resultChannel)
  }
  // We now have all results, sort them if you need them sorted
}