io.Pipe() 导致 WaitGroup 卡住

io.Pipe() causes WaitGroup to get stuck

我正在处理一个巨大的数据文件,大约有100 GB。那个大文件中的每一行都是 JSON 条数据,我想读取、压缩这些数据并将其存储在内存数据库中。

var wg sync.WaitGroup
for {
    line, err := reader.ReadString('\n')
    if err != nil {
        break
    }
    go func(index int) {
        wg.Add(1)
        pr, pw := io.Pipe()
        zw := lzw.NewWriter(pw, lzw.LSB, 8)
        _, err := io.Copy(zw, strings.NewReader(line))
        pw.Close()
        zw.Close()
        if err != nil {
            fmt.Println(err.Error())
        }
        b, err := io.ReadAll(pr)
        if err != nil {
            fmt.Println(err.Error())
        }
        client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
        pr.Close()
        wg.Done()
    }(index)
    if index%10000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }
    index += 1
}

但是,此代码在处理前 10000 行后停止。当我在 zw.Close() 之后向下移动 wg.Add(1) 时,它会继续处理该行的其余部分(但变得不稳定)。如果没有 lzwio.Pipe(),当我尝试以未压缩的方式存储精确值时,一切正常。

我不确定我是否没有正确使用 WaitGroup 或与 io.Pipe() 相关的东西我还不知道。

TLDR:
1- 删除 pr, pw := io.Pipe() 使代码更 简单 ,因为它是 多余的
试试 this:

line, err := reader.ReadString('\n')
if err == io.EOF {
    wg.Wait()
    break
}
if err != nil {
    log.Fatal(err)
}
wg.Add(1)
go func(index int) {
    var buf bytes.Buffer
    { // lexical scoping (static scoping)
        zw := lzw.NewWriter(&buf, lzw.LSB, 8)
        n, err := zw.Write([]byte(line)) // n, err := io.Copy(zw, strings.NewReader(line))
        if err != nil {
            log.Fatal(err)
        }
        if int(n) != len(line) {
            log.Fatal(n, len(line))
        }
        // It is the caller's responsibility to call Close on the WriteCloser when finished writing.
        if err = zw.Close(); err != nil {
            log.Fatal(err)
        }
    }
    ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
    client.Set(ctx, fmt.Sprintf("%d", index), base64.StdEncoding.EncodeToString(buf.Bytes()), 1000*time.Hour)

    cancelFunc()
    wg.Done()
}(index)

if index%tenThousand == 0 {
    wg.Wait()
}

2- 你需要把 wg.Add(1) 放在 go func(index int) { 之前:

    wg.Add(1)
    go func(index int) {

3- wg.Wait() 逻辑:

if index%10000 == 0 {
        fmt.Println(index)
        wg.Wait()
    }

如果 index%10000 != 0,最后一次迭代会发生什么。 所以这里当 err == io.EOF 你需要 wg.Wait() 让所有 goroutines 加入:

if err == io.EOF {
    wg.Wait()
    fmt.Println("\n**** All done **** index =", index)
    break
}

4- 您可以使用 词法范围 (静态范围)来限制一些变量范围并使代码更易于管理 - 并知道何时 Close lzw.NewWriter :

{ // lexical scoping (static scoping)
    zw := lzw.NewWriter(bufio.NewWriter(&buf), lzw.LSB, 8)
    n, err := io.Copy(zw, strings.NewReader(line))
    if err != nil {
        log.Fatal(err)
    }
    if int(n) != len(line) {
        log.Fatal(n, len(line))
    }
    // It is the caller's responsibility to call Close on the WriteCloser when finished writing.
    if err = zw.Close(); err != nil {
        log.Fatal(err)
    }
}

5- 始终检查错误,例如:

 if err = zw.Close(); err != nil {
    log.Fatal(err)
}

这是接近您的代码的工作版本 - 尝试 this 只是为了试验并发逻辑看看会发生什么(不推荐 因为它有多余的 goroutines 和io.Pipe - 正在工作:

package main

import (
    "bufio"
    "compress/lzw"
    "context"
    "encoding/base64"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
    "time"
)

func main() {
    index := 0
    client := &myClient{}
    reader := bufio.NewReader(file)
    // your code:
    var wg sync.WaitGroup
    for {
        index++
        line, err := reader.ReadString('\n')
        if err != nil {
            msg <- fmt.Sprint(index, " Done not waiting with err: ", err, time.Now())
            wg.Wait() // break waiting // if index%tenThousand != 0
            break
        }
        wg.Add(1)
        go func(i int) {
            msg <- fmt.Sprint(i, " Enter running ... ", time.Now())
            asyncReader, asyncWriter := io.Pipe() // make it async to read and write
            zipWriter := lzw.NewWriter(asyncWriter, lzw.LSB, 8)
            go func() { // async
                _, err := io.Copy(zipWriter, strings.NewReader(line))
                if err != nil {
                    log.Fatal(err)
                }
                _ = zipWriter.Close()
                _ = asyncWriter.Close() // for io.ReadAll
            }()
            b, err := io.ReadAll(asyncReader)
            if err != nil {
                log.Fatal(err)
            }
            client.Set(context.Background(), fmt.Sprintf("%d", i), base64.StdEncoding.EncodeToString(b), time.Hour*1000)
            asyncReader.Close()
            time.Sleep(1 * time.Second)
            msg <- fmt.Sprint(i, " Exit running ... ", time.Now())
            wg.Done()
        }(index)

        msg <- fmt.Sprint(index, " ", index%tenThousand == 0, " after go call")
        if index%tenThousand == 0 {
            wg.Wait()
            msg <- fmt.Sprint("..", index, " Done waiting after go call. ", time.Now())
        }
    }
    msg <- "Bye forever."

    wg.Wait()
    close(msg)
    wgMsg.Wait()
}

// just for the Go Playground:
const tenThousand = 2

type myClient struct {
}

func (p *myClient) Set(ctx context.Context, a, b string, t time.Duration) {
    // fmt.Println("a =", a, ", b =", b, ", t =", t)
    if ctx.Err() != nil {
        fmt.Println(ctx.Err())
    }
}

var file, myw = io.Pipe()

func init() {
    go func() {
        for i := 1; i <= tenThousand+1; i++ {
            fmt.Fprintf(myw, "%d text to compress aaaaaaaaaaaaaa\n", i)
        }
        myw.Close()
    }()
    wgMsg.Add(1)
    go func() {
        defer wgMsg.Done()
        for s := range msg {
            fmt.Println(s)
        }
    }()
}

var msg = make(chan string, 100)
var wgMsg sync.WaitGroup

输出:

1 false after go call
2 true after go call
1 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
2 Enter running ... 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
1 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
2 Exit running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
..2 Done waiting after go call. 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 false after go call
3 Enter running ... 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
4 Done not waiting with err: EOF 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
3 Exit running ... 2009-11-10 23:00:02 +0000 UTC m=+2.000000001
Bye forever.