为什么使用 goroutines for 循环会导致数据丢失
Why does for loop with goroutines result in missing data
好的,我有两段代码。首先是一个简单的 for 循环,效果很好
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
elasticsearch "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mitchellh/mapstructure"
)
type Esindices struct {
Health string `json:"health"`
Status string `json:"status"`
Index string `json:"index"`
Uuid string `json:"uuid"`
Pri string `json:"pri"`
Rep string `json:"rep"`
DocsCount string `json:"docs.count"`
DocsDeleted string `json:"docs.deleted"`
StoreSize string `json:"store.size"`
PriStoreSize string `json:"pri.store.size"`
}
func main() {
var r []map[string]interface{}
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating client: %s", err)
}
req := esapi.CatIndicesRequest{
Format: "json",
Pretty: false,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
}
indexSlice := make([]*Esindices, len(r))
for i, element := range r {
result := &Esindices{}
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
TagName: "json",
}
decoder, _ := mapstructure.NewDecoder(cfg)
decoder.Decode(element)
indexSlice[i] = result
}
thisisjson, err := json.MarshalIndent(indexSlice, "", " ")
if err != nil {
log.Fatal("Can't encode to JSON", err)
}
fmt.Fprintf(os.Stdout, "%s", thisisjson)
其中大部分是不言自明的,但只是为了澄清我正在使用 Elasticsearch 客户端和 api.cat.indices API 来获取本地 Elasticsearch 安装中所有索引的列表,并且然后将它们存储为 map[string]interface{}
的数组,然后遍历它以将它们添加到结果结构的一部分。这很好,实际上,但我想注意性能,虽然我不能改善请求本身的延迟,但我当然可以改善循环的性能,至少我认为我应该能够。
所以当我尝试下面的方法时,我得到了奇怪的结果。
var wg sync.WaitGroup
defer wg.Wait()
for i, element := range r {
wg.Add(1)
go func(i int, element map[string]interface{}) {
defer wg.Done()
result := Esindices{}
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
TagName: "json",
}
decoder, _ := mapstructure.NewDecoder(cfg)
decoder.Decode(element)
indexSlice[i] = result
}(i, element)
}
问题是,具体来说,切片中元素的键的某些值是空的。这让我觉得代码正在尝试添加到切片中,但即使没有完成它也会通过。
想法?
在 for 循环的末尾使用 而不是 defer wg.Wait
,使用 wg.Wait
。在 for 循环完成后,您正在使用由 goroutines 在 for-loop 中构建的数据,而不是在使用该数据之前等待所有 goroutines 完成。
当您使用 defer wg.Wait
时,等待发生在函数的末尾。使用数据的 for 循环对不完整的数据进行操作,因为 goroutines 仍然是 运行。
在for循环结束时使用wg.Wait
时,首先等待所有goroutines结束,然后使用它们产生的数据。
好的,我有两段代码。首先是一个简单的 for 循环,效果很好
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
elasticsearch "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mitchellh/mapstructure"
)
type Esindices struct {
Health string `json:"health"`
Status string `json:"status"`
Index string `json:"index"`
Uuid string `json:"uuid"`
Pri string `json:"pri"`
Rep string `json:"rep"`
DocsCount string `json:"docs.count"`
DocsDeleted string `json:"docs.deleted"`
StoreSize string `json:"store.size"`
PriStoreSize string `json:"pri.store.size"`
}
func main() {
var r []map[string]interface{}
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating client: %s", err)
}
req := esapi.CatIndicesRequest{
Format: "json",
Pretty: false,
}
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
}
indexSlice := make([]*Esindices, len(r))
for i, element := range r {
result := &Esindices{}
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
TagName: "json",
}
decoder, _ := mapstructure.NewDecoder(cfg)
decoder.Decode(element)
indexSlice[i] = result
}
thisisjson, err := json.MarshalIndent(indexSlice, "", " ")
if err != nil {
log.Fatal("Can't encode to JSON", err)
}
fmt.Fprintf(os.Stdout, "%s", thisisjson)
其中大部分是不言自明的,但只是为了澄清我正在使用 Elasticsearch 客户端和 api.cat.indices API 来获取本地 Elasticsearch 安装中所有索引的列表,并且然后将它们存储为 map[string]interface{}
的数组,然后遍历它以将它们添加到结果结构的一部分。这很好,实际上,但我想注意性能,虽然我不能改善请求本身的延迟,但我当然可以改善循环的性能,至少我认为我应该能够。
所以当我尝试下面的方法时,我得到了奇怪的结果。
var wg sync.WaitGroup
defer wg.Wait()
for i, element := range r {
wg.Add(1)
go func(i int, element map[string]interface{}) {
defer wg.Done()
result := Esindices{}
cfg := &mapstructure.DecoderConfig{
Metadata: nil,
Result: &result,
TagName: "json",
}
decoder, _ := mapstructure.NewDecoder(cfg)
decoder.Decode(element)
indexSlice[i] = result
}(i, element)
}
问题是,具体来说,切片中元素的键的某些值是空的。这让我觉得代码正在尝试添加到切片中,但即使没有完成它也会通过。
想法?
而不是 defer wg.Wait
,使用 wg.Wait
。在 for 循环完成后,您正在使用由 goroutines 在 for-loop 中构建的数据,而不是在使用该数据之前等待所有 goroutines 完成。
当您使用 defer wg.Wait
时,等待发生在函数的末尾。使用数据的 for 循环对不完整的数据进行操作,因为 goroutines 仍然是 运行。
在for循环结束时使用wg.Wait
时,首先等待所有goroutines结束,然后使用它们产生的数据。