Go 运行 循环与超时并行
Go run loop in parallel with timeout
我需要 运行 parallel
中的请求,而不是一个接一个地超时。现在我可以做吗?
这是我在并行中需要运行的具体代码,这里的技巧也是使用超时,即根据超时并在所有完成后获取响应。
for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
}
这是全部代码(工作代码)
https://play.golang.org/p/cXnJJ6PW_CF
package main
import (
`fmt`
`net/http`
`time`
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
name string
res http.Response
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}
func (p *Tap) Check() (*testerResponse, error) {
response := &testerResponse{}
req, err := http.NewRequest("GET", p.url, nil)
if err != nil {
return nil, err
}
res, e := p.client.Do(req)
response.name = p.name
response.res = *res
if err != nil {
return response, e
}
return response, e
}
func (p *Tap) Name() string {
return p.name
}
func main() {
var checkers []HT
testers := []Tap{
{
name: "first call",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "second call",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}
for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
checkers = append(checkers, checker)
}
}
并行可以在 Golang 中以不同的方式完成。
这是不推荐使用等待组、互斥锁和无限制 go 例程的幼稚方法。
我认为使用通道是进行并行处理的首选方法。
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
name string
res http.Response
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{
Timeout: timeout,
},
}
}
func (p *Tap) Check() (*testerResponse, error) {
response := &testerResponse{}
req, err := http.NewRequest("GET", p.url, nil)
if err != nil {
return nil, err
}
res, e := p.client.Do(req)
if e != nil {
return response, e
}
response.name = p.name
response.res = *res
return response, e
}
func (p *Tap) Name() string {
return p.name
}
func main() {
var checkers []HT
wg := sync.WaitGroup{}
locker := sync.Mutex{}
testers := []Tap{
{
name: "first call",
url: "http://google.com",
timeout: time.Second * 20,
},
{
name: "second call",
url: "http://www.example.com",
timeout: time.Millisecond * 100,
},
}
for _, test := range testers {
wg.Add(1)
go func(tst Tap) {
defer wg.Done()
checker := NewTap(tst.name, tst.url, tst.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
locker.Lock()
defer locker.Unlock()
checkers = append(checkers, checker)
}(test)
}
wg.Wait()
}
Go 中流行的并发模式是使用工作池。
一个基本的工作线程池使用两个通道;一个用于放置作业,另一个用于读取结果。在这种情况下,我们的职位渠道将是 Tap
类型,而我们的结果渠道将是 testerResponse
.
类型
工人
从工作频道获取工作并将结果放在结果频道。
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
职位
要添加工作,我们需要迭代我们的 testers
并将它们放在我们的工作频道上。
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
结果
为了读取结果,我们需要迭代它们。
// getResults takes a job from our worker pool and gets the result
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <- tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Println(status)
}
}
最后,我们的主要功能。
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }
// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}
综合起来
我将 'second call' 的超时更改为一毫秒以显示超时的工作原理。
package main
import (
"fmt"
"net/http"
"time"
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
err error
name string
res http.Response
url string
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}
func (p *Tap) Check() testerResponse {
fmt.Printf("Fetching %s %s \n", p.name, p.url)
// theres really no need for NewTap
nt := NewTap(p.name, p.url, p.timeout)
res, err := nt.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
}
// need to close body
res.Body.Close()
return testerResponse{name: p.name, res: *res, url: p.url}
}
func (p *Tap) Name() string {
return p.name
}
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
}
}
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
var (
testers = []Tap{
{
name: "1",
url: "http://google.com",
timeout: time.Second * 20,
},
{
name: "2",
url: "http://www.yahoo.com",
timeout: time.Second * 10,
},
{
name: "3",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "4",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "5",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "6",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "7",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "8",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "9",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "10",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "11",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "12",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "13",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "14",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}
)
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }
// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}
输出:
// Fetching http://whosebug.com
// Fetching http://www.example.com
// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
// 'first call' to 'http://whosebug.com' was fetched with status '200'
我需要 运行 parallel
中的请求,而不是一个接一个地超时。现在我可以做吗?
这是我在并行中需要运行的具体代码,这里的技巧也是使用超时,即根据超时并在所有完成后获取响应。
for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
}
这是全部代码(工作代码) https://play.golang.org/p/cXnJJ6PW_CF
package main
import (
`fmt`
`net/http`
`time`
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
name string
res http.Response
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}
func (p *Tap) Check() (*testerResponse, error) {
response := &testerResponse{}
req, err := http.NewRequest("GET", p.url, nil)
if err != nil {
return nil, err
}
res, e := p.client.Do(req)
response.name = p.name
response.res = *res
if err != nil {
return response, e
}
return response, e
}
func (p *Tap) Name() string {
return p.name
}
func main() {
var checkers []HT
testers := []Tap{
{
name: "first call",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "second call",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}
for _, test := range testers {
checker := NewTap(test.name, test.url, test.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
checkers = append(checkers, checker)
}
}
并行可以在 Golang 中以不同的方式完成。 这是不推荐使用等待组、互斥锁和无限制 go 例程的幼稚方法。 我认为使用通道是进行并行处理的首选方法。
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
name string
res http.Response
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{
Timeout: timeout,
},
}
}
func (p *Tap) Check() (*testerResponse, error) {
response := &testerResponse{}
req, err := http.NewRequest("GET", p.url, nil)
if err != nil {
return nil, err
}
res, e := p.client.Do(req)
if e != nil {
return response, e
}
response.name = p.name
response.res = *res
return response, e
}
func (p *Tap) Name() string {
return p.name
}
func main() {
var checkers []HT
wg := sync.WaitGroup{}
locker := sync.Mutex{}
testers := []Tap{
{
name: "first call",
url: "http://google.com",
timeout: time.Second * 20,
},
{
name: "second call",
url: "http://www.example.com",
timeout: time.Millisecond * 100,
},
}
for _, test := range testers {
wg.Add(1)
go func(tst Tap) {
defer wg.Done()
checker := NewTap(tst.name, tst.url, tst.timeout)
res, err := checker.Check()
if err != nil {
fmt.Println(err)
}
fmt.Println(res.name)
fmt.Println(res.res.StatusCode)
locker.Lock()
defer locker.Unlock()
checkers = append(checkers, checker)
}(test)
}
wg.Wait()
}
Go 中流行的并发模式是使用工作池。
一个基本的工作线程池使用两个通道;一个用于放置作业,另一个用于读取结果。在这种情况下,我们的职位渠道将是 Tap
类型,而我们的结果渠道将是 testerResponse
.
工人
从工作频道获取工作并将结果放在结果频道。
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
职位
要添加工作,我们需要迭代我们的 testers
并将它们放在我们的工作频道上。
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
结果
为了读取结果,我们需要迭代它们。
// getResults takes a job from our worker pool and gets the result
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <- tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Println(status)
}
}
最后,我们的主要功能。
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }
// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}
综合起来
我将 'second call' 的超时更改为一毫秒以显示超时的工作原理。
package main
import (
"fmt"
"net/http"
"time"
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
err error
name string
res http.Response
url string
}
type Tap struct {
url string
name string
timeout time.Duration
client *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url: url,
name: name,
client: &http.Client{Timeout: timeout},
}
}
func (p *Tap) Check() testerResponse {
fmt.Printf("Fetching %s %s \n", p.name, p.url)
// theres really no need for NewTap
nt := NewTap(p.name, p.url, p.timeout)
res, err := nt.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
}
// need to close body
res.Body.Close()
return testerResponse{name: p.name, res: *res, url: p.url}
}
func (p *Tap) Name() string {
return p.name
}
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'\n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
}
}
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
var (
testers = []Tap{
{
name: "1",
url: "http://google.com",
timeout: time.Second * 20,
},
{
name: "2",
url: "http://www.yahoo.com",
timeout: time.Second * 10,
},
{
name: "3",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "4",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "5",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "6",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "7",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "8",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "9",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "10",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "11",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "12",
url: "http://www.example.com",
timeout: time.Second * 10,
},
{
name: "13",
url: "http://whosebug.com",
timeout: time.Second * 20,
},
{
name: "14",
url: "http://www.example.com",
timeout: time.Second * 10,
},
}
)
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer) // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
// maxWorkers := 5
// for i := 0; i < maxWorkers; i++ {
// go worker(jobsPipe, resultsPipe)
// }
// the loop above is the same as doing:
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
go worker(jobsPipe, resultsPipe)
// ^^ this creates 5 workers..
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}
输出:
// Fetching http://whosebug.com
// Fetching http://www.example.com
// Get "http://www.example.com": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
// 'first call' to 'http://whosebug.com' was fetched with status '200'