在 Go 中实现工作人员池
Implementing a job worker pool in Go
由于 Go 没有泛型,所有预制解决方案都使用我不太喜欢的类型转换。我也想自己实现并尝试了以下代码。但是,有时它不会等待所有的 goroutines,我是否过早地关闭了 jobs 频道?我没有什么可以从他们那里拿来的。我可能也使用了一个伪输出通道并等待从它们那里获取确切的数量,但是我相信下面的代码也应该有效。我错过了什么?
func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
在 goroutine 外部调用 wg.Add 并将指针传递给等待组。
如果从 goroutine 内部调用 Add,则主 goroutine 有可能在 goroutine 有机会 运行 之前调用 Wait。如果尚未调用 Add,则 Wait 将立即 return。
将指针传递给 goroutine。否则,goroutines 使用它们自己的等待组副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
wg.Add(1)
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
您需要将指针传递给等待组,否则每个作业都会收到自己的副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
在这里查看不同之处:without pointer, with pointer。
由于 Go 没有泛型,所有预制解决方案都使用我不太喜欢的类型转换。我也想自己实现并尝试了以下代码。但是,有时它不会等待所有的 goroutines,我是否过早地关闭了 jobs 频道?我没有什么可以从他们那里拿来的。我可能也使用了一个伪输出通道并等待从它们那里获取确切的数量,但是我相信下面的代码也应该有效。我错过了什么?
func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
在 goroutine 外部调用 wg.Add 并将指针传递给等待组。
如果从 goroutine 内部调用 Add,则主 goroutine 有可能在 goroutine 有机会 运行 之前调用 Wait。如果尚未调用 Add,则 Wait 将立即 return。
将指针传递给 goroutine。否则,goroutines 使用它们自己的等待组副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
wg.Add(1)
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
您需要将指针传递给等待组,否则每个作业都会收到自己的副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
在这里查看不同之处:without pointer, with pointer。