在 Go 中实现工作人员池

Implementing a job worker pool in Go

由于 Go 没有泛型,所有预制解决方案都使用我不太喜欢的类型转换。我也想自己实现并尝试了以下代码。但是,有时它不会等待所有的 goroutines,我是否过早地关闭了 jobs 频道?我没有什么可以从他们那里拿来的。我可能也使用了一个伪输出通道并等待从它们那里获取确切的数量,但是我相信下面的代码也应该有效。我错过了什么?

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

在 goroutine 外部调用 wg.Add 并将指针传递给等待组。

如果从 goroutine 内部调用 Add,则主 goroutine 有可能在 goroutine 有机会 运行 之前调用 Wait。如果尚未调用 Add,则 Wait 将立即 return。

将指针传递给 goroutine。否则,goroutines 使用它们自己的等待组副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {

    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        wg.Add(1)
        go jobWorker(w, jobs, &wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

您需要将指针传递给等待组,否则每个作业都会收到自己的副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, &wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

在这里查看不同之处:without pointer, with pointer