使用 goroutines 和 context 创建可取消的 worker
Creating cancelable workers with goroutines and context
我正在尝试了解如何正确使用 goroutines 以及通道和上下文,以创建可取消的后台工作程序。
我熟悉使用在显式调用时可以取消的上下文,将它附加到 worker goroutine 应该可以让我停止 worker。
但我不知道如何使用它来实现这个目标。
下面的示例说明了一个从通道 'urls' 获取数据的 worker goroutine,它还带有一个可取消的上下文。
//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Worker %d is starting\n", id)
select {
// placeholder for a channel writing the data from the URL
case url := <-urls:
fmt.Printf("Worker :%d received url :%s\n", id, url)
// checking if the process is cancelled
case <-ctx.Done():
fmt.Printf("Worker :%d exitting..\n", id)
}
fmt.Printf("Worker :%d done..\n", id)
wg.Done()
}
这对我不起作用有两个原因,
- 对于无缓冲的通道,在没有 goroutines 读取的情况下写入它会阻塞它,所以一旦有更多数据添加到 urls 通道,发送方就会阻塞。
- 它returns立即,一旦两个通道中的任何一个returns。
我还尝试将 select 包装在一个无限循环中,但在上下文引发错误后添加了一个中断。
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Worker %d is starting\n", id)
for {
select {
// placeholder for a channel writing the data from the URL
case url := <-urls:
fmt.Printf("Worker :%d received url :%s\n", id, url)
// checking if the process is cancelled
case <-ctx.Done():
fmt.Printf("Worker :%d exitting..\n", id)
break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
}
}
fmt.Printf("Worker :%d done..\n", id) // code is unreachable
wg.Done()
}
实现这样的东西的正确方法是什么?
PS:任何有关设计此类工作进程的资源/参考资料也会有所帮助。
您可以用 return 替换中断,代码将起作用。
但是,更好的方法是:
- Workers 在 for / range 循环中消费通道
- 生产者应负责检测取消并关闭通道。 for 循环将停止级联
我专门为此做了一个Go包。您可以在这里找到它:https://github.com/MicahParks/ctxerrpool
这是项目 README.md
中的示例:
package main
import (
"bytes"
"context"
"log"
"net/http"
"os"
"time"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// Create an error handler that logs all errors.
var errorHandler ctxerrpool.ErrorHandler
errorHandler = func(pool ctxerrpool.Pool, err error) {
log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
}
// Create a worker pool with 4 workers.
pool := ctxerrpool.New(4, errorHandler)
// Create some variables to inherit through a closure.
httpClient := &http.Client{}
u := "https://golang.org"
logger := log.New(os.Stdout, "status codes: ", 0)
// Create the worker function.
var work ctxerrpool.Work
work = func(ctx context.Context) (err error) {
// Create the HTTP request.
var req *http.Request
if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
return err
}
// Do the HTTP request.
var resp *http.Response
if resp, err = httpClient.Do(req); err != nil {
return err
}
// Log the status code.
logger.Println(resp.StatusCode)
return nil
}
// Do the work 16 times.
for i := 0; i < 16; i++ {
// Create a context for the work.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Send the work to the pool.
pool.AddWorkItem(ctx, work)
}
// Wait for the pool to finish.
pool.Wait()
}
我正在尝试了解如何正确使用 goroutines 以及通道和上下文,以创建可取消的后台工作程序。
我熟悉使用在显式调用时可以取消的上下文,将它附加到 worker goroutine 应该可以让我停止 worker。
但我不知道如何使用它来实现这个目标。
下面的示例说明了一个从通道 'urls' 获取数据的 worker goroutine,它还带有一个可取消的上下文。
//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Worker %d is starting\n", id)
select {
// placeholder for a channel writing the data from the URL
case url := <-urls:
fmt.Printf("Worker :%d received url :%s\n", id, url)
// checking if the process is cancelled
case <-ctx.Done():
fmt.Printf("Worker :%d exitting..\n", id)
}
fmt.Printf("Worker :%d done..\n", id)
wg.Done()
}
这对我不起作用有两个原因,
- 对于无缓冲的通道,在没有 goroutines 读取的情况下写入它会阻塞它,所以一旦有更多数据添加到 urls 通道,发送方就会阻塞。
- 它returns立即,一旦两个通道中的任何一个returns。
我还尝试将 select 包装在一个无限循环中,但在上下文引发错误后添加了一个中断。
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
fmt.Printf("Worker %d is starting\n", id)
for {
select {
// placeholder for a channel writing the data from the URL
case url := <-urls:
fmt.Printf("Worker :%d received url :%s\n", id, url)
// checking if the process is cancelled
case <-ctx.Done():
fmt.Printf("Worker :%d exitting..\n", id)
break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
}
}
fmt.Printf("Worker :%d done..\n", id) // code is unreachable
wg.Done()
}
实现这样的东西的正确方法是什么?
PS:任何有关设计此类工作进程的资源/参考资料也会有所帮助。
您可以用 return 替换中断,代码将起作用。
但是,更好的方法是:
- Workers 在 for / range 循环中消费通道
- 生产者应负责检测取消并关闭通道。 for 循环将停止级联
我专门为此做了一个Go包。您可以在这里找到它:https://github.com/MicahParks/ctxerrpool
这是项目 README.md
中的示例:
package main
import (
"bytes"
"context"
"log"
"net/http"
"os"
"time"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// Create an error handler that logs all errors.
var errorHandler ctxerrpool.ErrorHandler
errorHandler = func(pool ctxerrpool.Pool, err error) {
log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
}
// Create a worker pool with 4 workers.
pool := ctxerrpool.New(4, errorHandler)
// Create some variables to inherit through a closure.
httpClient := &http.Client{}
u := "https://golang.org"
logger := log.New(os.Stdout, "status codes: ", 0)
// Create the worker function.
var work ctxerrpool.Work
work = func(ctx context.Context) (err error) {
// Create the HTTP request.
var req *http.Request
if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
return err
}
// Do the HTTP request.
var resp *http.Response
if resp, err = httpClient.Do(req); err != nil {
return err
}
// Log the status code.
logger.Println(resp.StatusCode)
return nil
}
// Do the work 16 times.
for i := 0; i < 16; i++ {
// Create a context for the work.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Send the work to the pool.
pool.AddWorkItem(ctx, work)
}
// Wait for the pool to finish.
pool.Wait()
}