使用 goroutines 丢失数据

Losing data with the goroutines

我正在编写 AWS lambda 代码来查询 RDS table,将其转换为 JSON,然后 return。但是我没有看到 JSON 中的所有记录,而不是 SQL 查询 return 编辑的记录。假设我正在查询 table 中的 1500 条记录,但每次 JSON 中只有 1496 到 1500 条记录(少 0-5 条记录)。我怀疑我搞砸了 sync.WaitGroup.

下面是SQL服务器查询

SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0

下面是我的代码

// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {

    receiver := make([]string, totalColumns)

    is := make([]interface{}, len(receiver))
    for i := range is {
        is[i] = &receiver[i]
    }

    err := rows.Scan(is...)

    if err != nil {
        fmt.Println("Error reading rows: " + err.Error())
    }

    TotalRecordsInParseRowfunction++
    return receiver
}

// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {

    // Query Table
    rows, err := conn.Query(query)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    println("Rows:", rows)
    defer rows.Close()

    // Get the column names
    columns, err := rows.Columns()
    // fmt.Println("columns", columns)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    totalColumns := len(columns)
    var resp []map[string]string // Declare the type of final response which will be used to create JSON
    var waitgroup sync.WaitGroup

    // Iterate over all the rows returned
    for rows.Next() {

        waitgroup.Add(1)
        TotalRecordsCount++
        row := parseRow(rows, totalColumns)

        go func() {
            // Create a map of the row
            respRow := map[string]string{} // Declare the type of each row of response
            for count := range row {
                respRow[columns[count]] = row[count]
            }
            // fmt.Println("\n\nrespRow", respRow)

            resp = append(resp, respRow)
            TotalRecordsAppendedCount++
            waitgroup.Done()
        }()
    }
    waitgroup.Wait()

    // If no rows are returned
    if len(resp) == 0 {
        fmt.Println("MESSAGE: No records are available")
        return "", errors.New("MESSAGE: No records are available")
    }

    // Create JSON
    respJSON, _ := json.Marshal(resp)
    fmt.Println("Response", string(respJSON))

    fmt.Println("\n--------------Summary---------------")
    fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
    fmt.Println("TotalRecordsCount", TotalRecordsCount)
    fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
    fmt.Println("Object Length", len(resp))

    return string(respJSON), nil // Return JSON

}

下面是我得到的输出摘要

--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496

您的代码很活泼。多个 goroutine 正在写入 resp 而没有任何互斥,因此您丢失了数据。

您可以围绕它添加一个互斥锁-解锁。但是,您在 goroutine 中的代码不保证有自己的 goroutine,因为它是一个简单的 map 添加。在 goroutine 中处理该代码会容易得多,并且可能 运行 没有 goroutine 调度开销的情况下会快得多。除非你打算在那个 goroutine 中有更多的逻辑,否则我建议你删除它。

这里有一些关于可能发生的事情的更多信息:首先,在当前版本的 go 中,一个 goroutine 只会在 goroutine 调用某些库函数时让步给其他 goroutine。查看代码,您的 goroutines 不太可能产生。由于您已经观察到数据丢失(这意味着存在竞争条件),因此您可能拥有多个内核。

比赛在这里:

resp = append(resp, respRow)

没有互斥,一个 goroutine 可能会查看 resp,发现它可以写入它的第 n 个元素。另一个 goroutine(运行ning 在一个单独的核心上)可以做同样的事情,并成功写入。但是第一个 goroutine 仍然认为元素是空的,所以覆盖它,并更新 resp。发生这种情况时,您将丢失一个元素。

如果你在这段代码中添加互斥,你实际上会强制所有 goroutines 按顺序 运行 因为它们实际上没有做任何其他事情。此外,由于 goroutine 执行顺序是随机的,你最终会得到一个随机排序的 resp。简而言之,这是您应该按顺序执行代码的实例之一。