使用 goroutines 丢失数据
Losing data with the goroutines
我正在编写 AWS lambda 代码来查询 RDS table,将其转换为 JSON,然后 return。但是我没有看到 JSON 中的所有记录,而不是 SQL 查询 return 编辑的记录。假设我正在查询 table 中的 1500 条记录,但每次 JSON 中只有 1496 到 1500 条记录(少 0-5 条记录)。我怀疑我搞砸了 sync.WaitGroup
.
下面是SQL服务器查询
SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0
下面是我的代码
// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {
receiver := make([]string, totalColumns)
is := make([]interface{}, len(receiver))
for i := range is {
is[i] = &receiver[i]
}
err := rows.Scan(is...)
if err != nil {
fmt.Println("Error reading rows: " + err.Error())
}
TotalRecordsInParseRowfunction++
return receiver
}
// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {
// Query Table
rows, err := conn.Query(query)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
println("Rows:", rows)
defer rows.Close()
// Get the column names
columns, err := rows.Columns()
// fmt.Println("columns", columns)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
totalColumns := len(columns)
var resp []map[string]string // Declare the type of final response which will be used to create JSON
var waitgroup sync.WaitGroup
// Iterate over all the rows returned
for rows.Next() {
waitgroup.Add(1)
TotalRecordsCount++
row := parseRow(rows, totalColumns)
go func() {
// Create a map of the row
respRow := map[string]string{} // Declare the type of each row of response
for count := range row {
respRow[columns[count]] = row[count]
}
// fmt.Println("\n\nrespRow", respRow)
resp = append(resp, respRow)
TotalRecordsAppendedCount++
waitgroup.Done()
}()
}
waitgroup.Wait()
// If no rows are returned
if len(resp) == 0 {
fmt.Println("MESSAGE: No records are available")
return "", errors.New("MESSAGE: No records are available")
}
// Create JSON
respJSON, _ := json.Marshal(resp)
fmt.Println("Response", string(respJSON))
fmt.Println("\n--------------Summary---------------")
fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
fmt.Println("TotalRecordsCount", TotalRecordsCount)
fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
fmt.Println("Object Length", len(resp))
return string(respJSON), nil // Return JSON
}
下面是我得到的输出摘要
--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496
您的代码很活泼。多个 goroutine 正在写入 resp
而没有任何互斥,因此您丢失了数据。
您可以围绕它添加一个互斥锁-解锁。但是,您在 goroutine 中的代码不保证有自己的 goroutine,因为它是一个简单的 map 添加。在 goroutine 中处理该代码会容易得多,并且可能 运行 没有 goroutine 调度开销的情况下会快得多。除非你打算在那个 goroutine 中有更多的逻辑,否则我建议你删除它。
这里有一些关于可能发生的事情的更多信息:首先,在当前版本的 go 中,一个 goroutine 只会在 goroutine 调用某些库函数时让步给其他 goroutine。查看代码,您的 goroutines 不太可能产生。由于您已经观察到数据丢失(这意味着存在竞争条件),因此您可能拥有多个内核。
比赛在这里:
resp = append(resp, respRow)
没有互斥,一个 goroutine 可能会查看 resp
,发现它可以写入它的第 n
个元素。另一个 goroutine(运行ning 在一个单独的核心上)可以做同样的事情,并成功写入。但是第一个 goroutine 仍然认为元素是空的,所以覆盖它,并更新 resp
。发生这种情况时,您将丢失一个元素。
如果你在这段代码中添加互斥,你实际上会强制所有 goroutines 按顺序 运行 因为它们实际上没有做任何其他事情。此外,由于 goroutine 执行顺序是随机的,你最终会得到一个随机排序的 resp
。简而言之,这是您应该按顺序执行代码的实例之一。
我正在编写 AWS lambda 代码来查询 RDS table,将其转换为 JSON,然后 return。但是我没有看到 JSON 中的所有记录,而不是 SQL 查询 return 编辑的记录。假设我正在查询 table 中的 1500 条记录,但每次 JSON 中只有 1496 到 1500 条记录(少 0-5 条记录)。我怀疑我搞砸了 sync.WaitGroup
.
下面是SQL服务器查询
SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0
下面是我的代码
// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {
receiver := make([]string, totalColumns)
is := make([]interface{}, len(receiver))
for i := range is {
is[i] = &receiver[i]
}
err := rows.Scan(is...)
if err != nil {
fmt.Println("Error reading rows: " + err.Error())
}
TotalRecordsInParseRowfunction++
return receiver
}
// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {
// Query Table
rows, err := conn.Query(query)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
println("Rows:", rows)
defer rows.Close()
// Get the column names
columns, err := rows.Columns()
// fmt.Println("columns", columns)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
totalColumns := len(columns)
var resp []map[string]string // Declare the type of final response which will be used to create JSON
var waitgroup sync.WaitGroup
// Iterate over all the rows returned
for rows.Next() {
waitgroup.Add(1)
TotalRecordsCount++
row := parseRow(rows, totalColumns)
go func() {
// Create a map of the row
respRow := map[string]string{} // Declare the type of each row of response
for count := range row {
respRow[columns[count]] = row[count]
}
// fmt.Println("\n\nrespRow", respRow)
resp = append(resp, respRow)
TotalRecordsAppendedCount++
waitgroup.Done()
}()
}
waitgroup.Wait()
// If no rows are returned
if len(resp) == 0 {
fmt.Println("MESSAGE: No records are available")
return "", errors.New("MESSAGE: No records are available")
}
// Create JSON
respJSON, _ := json.Marshal(resp)
fmt.Println("Response", string(respJSON))
fmt.Println("\n--------------Summary---------------")
fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
fmt.Println("TotalRecordsCount", TotalRecordsCount)
fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
fmt.Println("Object Length", len(resp))
return string(respJSON), nil // Return JSON
}
下面是我得到的输出摘要
--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496
您的代码很活泼。多个 goroutine 正在写入 resp
而没有任何互斥,因此您丢失了数据。
您可以围绕它添加一个互斥锁-解锁。但是,您在 goroutine 中的代码不保证有自己的 goroutine,因为它是一个简单的 map 添加。在 goroutine 中处理该代码会容易得多,并且可能 运行 没有 goroutine 调度开销的情况下会快得多。除非你打算在那个 goroutine 中有更多的逻辑,否则我建议你删除它。
这里有一些关于可能发生的事情的更多信息:首先,在当前版本的 go 中,一个 goroutine 只会在 goroutine 调用某些库函数时让步给其他 goroutine。查看代码,您的 goroutines 不太可能产生。由于您已经观察到数据丢失(这意味着存在竞争条件),因此您可能拥有多个内核。
比赛在这里:
resp = append(resp, respRow)
没有互斥,一个 goroutine 可能会查看 resp
,发现它可以写入它的第 n
个元素。另一个 goroutine(运行ning 在一个单独的核心上)可以做同样的事情,并成功写入。但是第一个 goroutine 仍然认为元素是空的,所以覆盖它,并更新 resp
。发生这种情况时,您将丢失一个元素。
如果你在这段代码中添加互斥,你实际上会强制所有 goroutines 按顺序 运行 因为它们实际上没有做任何其他事情。此外,由于 goroutine 执行顺序是随机的,你最终会得到一个随机排序的 resp
。简而言之,这是您应该按顺序执行代码的实例之一。