go routines 和接收错误或成功的通道
go routines and a channel to receive error or success
我有一个函数,我想定义最大数量的 go 例程,我有一个列表,我遍历这个列表,然后通过通道向 go 例程发送消息,在这个 go 例程中,我将调用一个函数,该函数将获得答案或错误,当它不是错误时我想将 return 保存在一个切片中,当它是错误时我想停止 go 例程并进行调用。
但是我无法做到这样,当我遇到错误时,所有的 go 例程都会结束,我需要 err
的值
type response struct {
value string
}
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
wg := &sync.WaitGroup{}
wg.Add(maxParallel)
if len(fakeValues) < maxParallel {
maxParallel = len(fakeValues)
}
errReceive := make(chan error, 1)
defer close(errReceive)
response := make([]response, 0)
valuesChan := make(chan string, 1)
for i := 0; i < maxParallel; i++ {
go func(valuesChan <-chan string, errReceive chan error) {
for value := range valuesChan {
resp, err := getFakeResult(value)
if err != nil {
errReceive <- err
}
response = append(response, resp)
}
wg.Done()
}(valuesChan, errReceive)
}
for _, val := range fakeValues {
valuesChan <- val
}
close(valuesChan)
wg.Wait()
err := <-errReceive
if err != nil {
// make any thing
}
return response
}
func getFakeValues() []string {
return []string{"a", "b"}
}
func getFakeResult(val string) (response, error) {
if val == "a" {
return response{}, fmt.Errorf("ooh noh:%s", val)
}
return response{
value: val,
}, nil
}
您可以将上下文与取消一起使用,并使用它让 go 例程知道它们应该停止。
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
fmt.Println("work")
}
}
}(ctx)
time.Sleep(time.Second * 5)
cancel()
wg.Wait()
https://go.dev/play/p/qe2oDppSnaF
这是一个示例,可以在您的用例上下文中更好地展示它。
type result struct {
err error
val int
}
rand.Seed(time.Now().UnixNano())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rchan := make(chan result, 5)
wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
n := rand.Intn(100)
if n > 90 {
rchan <- result{err: fmt.Errorf("error %d", n)}
} else {
rchan <- result{val: n}
}
}
}
}(ctx)
}
go func() {
wg.Wait()
close(rchan)
}()
for res := range rchan {
if res.err != nil {
fmt.Println(res.err)
cancel()
break
} else {
fmt.Println(res.val)
}
}
我有一个函数,我想定义最大数量的 go 例程,我有一个列表,我遍历这个列表,然后通过通道向 go 例程发送消息,在这个 go 例程中,我将调用一个函数,该函数将获得答案或错误,当它不是错误时我想将 return 保存在一个切片中,当它是错误时我想停止 go 例程并进行调用。 但是我无法做到这样,当我遇到错误时,所有的 go 例程都会结束,我需要 err
的值type response struct {
value string
}
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
wg := &sync.WaitGroup{}
wg.Add(maxParallel)
if len(fakeValues) < maxParallel {
maxParallel = len(fakeValues)
}
errReceive := make(chan error, 1)
defer close(errReceive)
response := make([]response, 0)
valuesChan := make(chan string, 1)
for i := 0; i < maxParallel; i++ {
go func(valuesChan <-chan string, errReceive chan error) {
for value := range valuesChan {
resp, err := getFakeResult(value)
if err != nil {
errReceive <- err
}
response = append(response, resp)
}
wg.Done()
}(valuesChan, errReceive)
}
for _, val := range fakeValues {
valuesChan <- val
}
close(valuesChan)
wg.Wait()
err := <-errReceive
if err != nil {
// make any thing
}
return response
}
func getFakeValues() []string {
return []string{"a", "b"}
}
func getFakeResult(val string) (response, error) {
if val == "a" {
return response{}, fmt.Errorf("ooh noh:%s", val)
}
return response{
value: val,
}, nil
}
您可以将上下文与取消一起使用,并使用它让 go 例程知道它们应该停止。
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
fmt.Println("work")
}
}
}(ctx)
time.Sleep(time.Second * 5)
cancel()
wg.Wait()
https://go.dev/play/p/qe2oDppSnaF
这是一个示例,可以在您的用例上下文中更好地展示它。
type result struct {
err error
val int
}
rand.Seed(time.Now().UnixNano())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rchan := make(chan result, 5)
wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case <-ctx.Done():
fmt.Println("context is done")
return
case <-time.After(time.Second):
n := rand.Intn(100)
if n > 90 {
rchan <- result{err: fmt.Errorf("error %d", n)}
} else {
rchan <- result{val: n}
}
}
}
}(ctx)
}
go func() {
wg.Wait()
close(rchan)
}()
for res := range rchan {
if res.err != nil {
fmt.Println(res.err)
cancel()
break
} else {
fmt.Println(res.val)
}
}