Go 运行 循环与超时并行

Go run loop in parallel with timeout

我需要 运行 parallel 中的请求,而不是一个接一个地超时。现在我可以做吗?

这是我在并行中需要运行的具体代码,这里的技巧也是使用超时,即根据超时并在所有完成后获取响应。

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

    }

这是全部代码(工作代码) https://play.golang.org/p/cXnJJ6PW_CF

package main

import (
    `fmt`
    `net/http`
    `time`
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    name string
    res  http.Response
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:    url,
        name:   name,
        client: &http.Client{Timeout: timeout},
    }
}

func (p *Tap) Check() (*testerResponse, error) {
    response := &testerResponse{}
    req, err := http.NewRequest("GET", p.url, nil)
    if err != nil {
        return nil, err
    }
    res, e := p.client.Do(req)
    response.name = p.name
    response.res = *res
    if err != nil {
        return response, e
    }
    return response, e
}

func (p *Tap) Name() string {
    return p.name
}

func main() {

    var checkers []HT

    testers := []Tap{
        {
            name:    "first call",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "second call",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
    }

    for _, test := range testers {
        checker := NewTap(test.name, test.url, test.timeout)
        res, err := checker.Check()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println(res.name)
        fmt.Println(res.res.StatusCode)

        checkers = append(checkers, checker)

    }
}

并行可以在 Golang 中以不同的方式完成。 这是不推荐使用等待组、互斥锁和无限制 go 例程的幼稚方法。 我认为使用通道是进行并行处理的首选方法。

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    name string
    res  http.Response
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:  url,
        name: name,
        client: &http.Client{
            Timeout: timeout,
        },
    }
}

func (p *Tap) Check() (*testerResponse, error) {
    response := &testerResponse{}
    req, err := http.NewRequest("GET", p.url, nil)
    if err != nil {
        return nil, err
    }
    res, e := p.client.Do(req)
    if e != nil {
        return response, e
    }
    response.name = p.name
    response.res = *res

    return response, e
}

func (p *Tap) Name() string {
    return p.name
}

func main() {

    var checkers []HT
    wg := sync.WaitGroup{}
    locker := sync.Mutex{}

    testers := []Tap{
        {
            name:    "first call",
            url:     "http://google.com",
            timeout: time.Second * 20,
        },
        {
            name:    "second call",
            url:     "http://www.example.com",
            timeout: time.Millisecond * 100,
        },
    }

    for _, test := range testers {
        wg.Add(1)
        go func(tst Tap) {
            defer wg.Done()
            checker := NewTap(tst.name, tst.url, tst.timeout)
            res, err := checker.Check()
            if err != nil {
                fmt.Println(err)
            }
            fmt.Println(res.name)
            fmt.Println(res.res.StatusCode)
            locker.Lock()
            defer locker.Unlock()
            checkers = append(checkers, checker)
        }(test)
    }

    wg.Wait()
}

Go 中流行的并发模式是使用工作池。

一个基本的工作线程池使用两个通道;一个用于放置作业,另一个用于读取结果。在这种情况下,我们的职位渠道将是 Tap 类型,而我们的结果渠道将是 testerResponse.

类型

工人

从工作频道获取工作并将结果放在结果频道。

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
    for n := range jobs {
        results <- n.Check()
    }
}

职位

要添加工作,我们需要迭代我们的 testers 并将它们放在我们的工作频道上。

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
    for _, t := range taps {
        jobs <- t
    }
}

结果

为了读取结果,我们需要迭代它们。

// getResults takes a job from our worker pool and gets the result
func getResults(tr <-chan testerResponse, taps []Tap) {
    for range taps {
        r := <- tr
        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
        if r.err != nil {
            status = fmt.Sprintf(r.err.Error())
        }
        fmt.Println(status)
    }
}

最后,我们的主要功能。

func main() {
    // Make buffered channels
    buffer := len(testers)
    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

    // Create worker pool
    // Max workers default is 5
    // maxWorkers := 5
    // for i := 0; i < maxWorkers; i++ {
    //  go worker(jobsPipe, resultsPipe)
    // }

    // the loop above is the same as doing:
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    // ^^ this creates 5 workers..

    makeJobs(jobsPipe, testers)
    getResults(resultsPipe, testers)
}

综合起来

我将 'second call' 的超时更改为一毫秒以显示超时的工作原理。

package main

import (
    "fmt"
    "net/http"
    "time"
)

type HT interface {
    Name() string
    Check() (*testerResponse, error)
}

type testerResponse struct {
    err  error
    name string
    res  http.Response
    url  string
}

type Tap struct {
    url     string
    name    string
    timeout time.Duration
    client  *http.Client
}

func NewTap(name, url string, timeout time.Duration) *Tap {
    return &Tap{
        url:    url,
        name:   name,
        client: &http.Client{Timeout: timeout},
    }
}

func (p *Tap) Check() testerResponse {
    fmt.Printf("Fetching %s %s \n", p.name, p.url)
    // theres really no need for NewTap
    nt := NewTap(p.name, p.url, p.timeout)
    res, err := nt.client.Get(p.url)
    if err != nil {
        return testerResponse{err: err}
    }

    // need to close body
    res.Body.Close()
    return testerResponse{name: p.name, res: *res, url: p.url}
}

func (p *Tap) Name() string {
    return p.name
}

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
    for _, t := range taps {
        jobs <- t
    }
}

// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
    for range taps {
        r := <-tr
        status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
        if r.err != nil {
            status = fmt.Sprintf(r.err.Error())
        }
        fmt.Printf(status)
    }
}

// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
    for n := range jobs {
        results <- n.Check()
    }
}

var (
    testers = []Tap{
        {
            name:    "1",
            url:     "http://google.com",
            timeout: time.Second * 20,
        },
        {
            name:    "2",
            url:     "http://www.yahoo.com",
            timeout: time.Second * 10,
        },
        {
            name:    "3",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "4",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "5",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "6",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "7",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "8",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "9",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "10",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "11",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "12",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
        {
            name:    "13",
            url:     "http://whosebug.com",
            timeout: time.Second * 20,
        },
        {
            name:    "14",
            url:     "http://www.example.com",
            timeout: time.Second * 10,
        },
    }
)

func main() {
    // Make buffered channels
    buffer := len(testers)
    jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
    resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`

    // Create worker pool
    // Max workers default is 5
    // maxWorkers := 5
    // for i := 0; i < maxWorkers; i++ {
    //  go worker(jobsPipe, resultsPipe)
    // }

    // the loop above is the same as doing:
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    go worker(jobsPipe, resultsPipe)
    // ^^ this creates 5 workers..

    makeJobs(jobsPipe, testers)
    getResults(resultsPipe, testers)
}

输出:

// Fetching http://whosebug.com 
// Fetching http://www.example.com 
// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
// 'first call' to 'http://whosebug.com' was fetched with status '200'