使用 goroutines 和 context 创建可取消的 worker

Creating cancelable workers with goroutines and context

我正在尝试了解如何正确使用 goroutines 以及通道和上下文,以创建可取消的后台工作程序。

我熟悉使用在显式调用时可以取消的上下文,将它附加到 worker goroutine 应该可以让我停止 worker。

但我不知道如何使用它来实现这个目标。

下面的示例说明了一个从通道 'urls' 获取数据的 worker goroutine,它还带有一个可取消的上下文。

//worker.go
func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    select {
    // placeholder for a channel writing the data from the URL
    case url := <-urls:
        fmt.Printf("Worker :%d received url :%s\n", id, url)
    // checking if the process is cancelled
    case <-ctx.Done():
        fmt.Printf("Worker :%d exitting..\n", id)
    }
    fmt.Printf("Worker :%d done..\n", id)
    wg.Done()
}

这对我不起作用有两个原因,

  1. 对于无缓冲的通道,在没有 goroutines 读取的情况下写入它会阻塞它,所以一旦有更多数据添加到 urls 通道,发送方就会阻塞。
  2. 它returns立即,一旦两个通道中的任何一个returns。

我还尝试将 select 包装在一个无限循环中,但在上下文引发错误后添加了一个中断。

func Worker(id int, client *http.Client, urls chan string, ctx context.Context, wg *sync.WaitGroup) {
    fmt.Printf("Worker %d is starting\n", id)
    for {
        select {
        // placeholder for a channel writing the data from the URL
        case url := <-urls:
            fmt.Printf("Worker :%d received url :%s\n", id, url)
        // checking if the process is cancelled
        case <-ctx.Done():
            fmt.Printf("Worker :%d exitting..\n", id)
            break // raises error :ineffective break statement. Did you mean to break out of the outer loop? (SA4011)go-staticcheck
        }
    }
    fmt.Printf("Worker :%d done..\n", id) // code is unreachable
    wg.Done()
}

实现这样的东西的正确方法是什么?

PS:任何有关设计此类工作进程的资源/参考资料也会有所帮助。

您可以用 return 替换中断,代码将起作用。

但是,更好的方法是:

  1. Workers 在 for / range 循环中消费通道
  2. 生产者应负责检测取消并关闭通道。 for 循环将停止级联

我专门为此做了一个Go包。您可以在这里找到它:https://github.com/MicahParks/ctxerrpool

这是项目 README.md 中的示例:

package main

import (
    "bytes"
    "context"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/MicahParks/ctxerrpool"
)

func main() {

    // Create an error handler that logs all errors.
    var errorHandler ctxerrpool.ErrorHandler
    errorHandler = func(pool ctxerrpool.Pool, err error) {
        log.Printf("An error occurred. Error: \"%s\".\n", err.Error())
    }

    // Create a worker pool with 4 workers.
    pool := ctxerrpool.New(4, errorHandler)

    // Create some variables to inherit through a closure.
    httpClient := &http.Client{}
    u := "https://golang.org"
    logger := log.New(os.Stdout, "status codes: ", 0)

    // Create the worker function.
    var work ctxerrpool.Work
    work = func(ctx context.Context) (err error) {

        // Create the HTTP request.
        var req *http.Request
        if req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, bytes.NewReader(nil)); err != nil {
            return err
        }

        // Do the HTTP request.
        var resp *http.Response
        if resp, err = httpClient.Do(req); err != nil {
            return err
        }

        // Log the status code.
        logger.Println(resp.StatusCode)

        return nil
    }

    // Do the work 16 times.
    for i := 0; i < 16; i++ {

        // Create a context for the work.
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        defer cancel()

        // Send the work to the pool.
        pool.AddWorkItem(ctx, work)
    }

    // Wait for the pool to finish.
    pool.Wait()
}