如何在循环中创建频道?

How to create channels in loop?

我正在学习 go 中的并发及其工作原理。

我想做什么?

我试过的代码

        package main

    import (
        "fmt"
        "github.com/pkg/errors"
        "time"
    )

    type subject struct {
        Name string
        Class string
        StartDate time.Time
        EndDate time.Time
    }

    type workerData struct {
        Subject string
        Class string
        Result string
        Error error
    }

    func main () {

        // Creating test data
        var subjects []subject
        st,_ := time.Parse("01/02/2016","01/01/2015")
        et,_ := time.Parse("01/02/2016","01/01/2016")
        s1 := subject{Name:"Math", Class:"3", StartDate:st,EndDate:et }
        s2 := subject{Name:"Geo", Class:"3", StartDate:st,EndDate:et }
        s3 := subject{Name:"Bio", Class:"3", StartDate:st,EndDate:et }
        s4 := subject{Name:"Phy", Class:"3", StartDate:st,EndDate:et }
        s5 := subject{Name:"Art", Class:"3", StartDate:st,EndDate:et }
        subjects = append(subjects, s1)
        subjects = append(subjects, s2)
        subjects = append(subjects, s3)
        subjects = append(subjects, s4)
        subjects = append(subjects, s5)
        c := make(chan workerData) // I am sure this is not how I should be creating channel

        for i := 0 ; i< len(subjects) ; i++ {
            go worker(c)
        }

        for _, v := range subjects {
            // Setting required data in channel
            data := workerData{Subject:v.Name, Class:v.Class}

            // set the data and start the routine
            c <- data // I think this will update data for all the routines ? SO how should create separate channel for each routine

        }

        // I want to wait till all the routines set the data in channel and return the data from workers.
        for {
            select {
                case data := <- c :
                    fmt.Println(data)
            }
        }
    }

    func worker (c chan workerData) {
        data := <- c
        // This can be any processing
        time.Sleep(100 * time.Millisecond)
        if data.Subject != "Math" {
            data.Result = "Pass"
        } else {
            data.Error = errors.New("Subject not found")
        }
        fmt.Println(data.Subject)
        // returning processed data and error to channel
        c <- data
        // Rightfully this closes channel and here after I get error send on Closed channel.
        close(c)
    }

Playgorund Link - https://play.golang.org/p/hs1-B1UR98r

我面临的问题

我不确定如何为每个数据项创建不同的通道。我目前正在做的方式将更新所有例程的通道数据。我想知道有没有办法为循环中的每个数据项创建不同的通道并将其传递给 go rutine。然后在主例程中等待所有通道的例程返回结果。

任何指示/帮助都会很棒?如果有任何困惑,请随时发表评论。

"// I think this will update data for all the routines ?"

通道(为了简化)不是存储数据的数据结构。

它是一个通过不同 goroutines发送接收数据的结构。

因此,请注意您的工作函数正在每个 goroutine 实例中的同一通道上进行发送和接收。如果您只有一个这样的工作人员实例,这将 死锁 (https://golang.org/doc/articles/race_detector.html).

在您 post 编写的代码版本中,对于初学者来说这似乎可行,因为您有许多工作人员相互交换作品。但是对于一个正确的程序来说是错误的。

因此,如果一个 worker 不能读取和写入同一个通道,那么它必须使用一个特定的可写通道来将其结果发送到其他一些例程。

// I want to wait till all the routines set the data in channel and return the data from workers.

这是同步机制的一部分,可确保推送器等待所有工作人员完成其工作,然后再继续进行。 (本博客post说说https://medium.com/golangspec/synchronized-goroutines-part-i-4fbcdd64a4ec

// Rightfully this closes channel and here after I get error send on Closed channel.

注意你有 n 个并行执行的 worker 例程。这个 worker 中第一个到达其函数末尾的将关闭通道,使其对其他 worker 不可写,并向 main 发出错误的结束信号。

通常在编写器端使用关闭语句来指示没有更多数据进入通道。表示它已经结束。读者使用此信号退出通道的读等待操作。

举个例子,让我们回顾一下这个循环

    for {
        select {
            case data := <- c :
                fmt.Println(data)
        }
    }

很糟糕,真的很糟糕。

  1. 这是一个没有退出语句的无限循环
  2. select 是多余的并且不包含退出语句​​,请记住在通道上读取是阻塞操作。
  3. 这是对语言提供的标准模式的错误重写,通道上的范围循环

通道上的范围循环写得非常简单

    for data := range c {
        fmt.Println(data)
    }

这个模式有一个很大的优势,它会自动检测一个关闭的通道来退出循环!让您只循环处理相关数据。它也更加简洁。

此外,您的 worker 很尴尬,因为它在退出前只读取和写入一个元素。 产生 go routines 很便宜,但不是免费的。您应该始终评估异步处理成本与其实际工作负载之间的权衡。

总的来说,您的代码应该更接近此处演示的内容 https://gobyexample.com/worker-pools