同时从一个文件写入多个 csv 文件,在 Golang 的分区列上拆分

Concurrently write multiple csv files from one, splitting on a partition column in Golang

我的objective是读取一个或多个共享通用格式的csv文件,并根据csv数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定的分区。一个文件的示例:

fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02

如果这种方法闻起来像可怕的 XY 问题,我很乐意进行调整。

到目前为止我尝试过的:

这显然行不通(目前),因为我不知道如何根据在给定行上看到的分区值将行发送给正确的工作人员。

我已经为每个分区值给每个工作人员一个 id string,但我不知道如何 select 那个工作人员要发送给,如果我应该创建一个单独的 chan []string 对于每个工作人员并使用 select 发送到该频道,或者如果一个结构应该为每个工作人员提供某种池和路由功能。

TLDR;我不知道如何根据某些分类 string 值有条件地将数据发送到给定的 go 例程或通道,其中唯一的数量可以是任意的,但可能不超过 24 个唯一的分区值。

我会警告说我注意到像这样的问题确实被否决了,所以如果你觉得这是反建设性的或不完整的足以否决投票,请评论为什么这样我可以避免重蹈覆辙

提前感谢您的帮助!

Playground

片段:

  package main

    import (
        "encoding/csv"
        "fmt"
        "log"
        "strings"
        "time"
    )

    func main() {

        // CSV
        r := csv.NewReader(csvFile1)
        lines, err := r.ReadAll()
        if err != nil {
            log.Fatalf("error reading all lines: %v", err)
        }

        // CHANNELS
        lineChan := make(chan []string)

        // TRACKER
        var seenPartitions []string

        for _, line := range lines {

            hour := line[6]
            if !stringInSlice(hour, seenPartitions) {
                seenPartitions = append(seenPartitions, hour)
                go worker(hour, lineChan)
            }
            // How to send to the correct worker/channel? 
            lineChan <- line

        }
        close(lineChan)
    }

    func worker(id string, lineChan <-chan []string) {
        for j := range lineChan {
            fmt.Println("worker", id, "started  job", j)
            // Write to a new file here and wait for input over the channel
            time.Sleep(time.Second)
            fmt.Println("worker", id, "finished job", j)
        }
    }

    func stringInSlice(str string, list []string) bool {
        for _, v := range list {
            if v == str {
                return true
            }
        }
        return false
    }

    // DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04 
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)

同步版本没有先去并发魔术(见下面的并发版本)。

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
)

func main() {

    // CSV
    r := csv.NewReader(csvFile1)
    partitions := make(map[string][][]string)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                err = nil

                save_partitions(partitions)

                return
            }
            log.Fatal(err)
        }

        process(rec, partitions)
    }

}

// prints only
func save_partitions(partitions map[string][][]string) {
    for part, recs := range partitions {
        fmt.Println(part)
        for _, rec := range recs {
            fmt.Println(rec)
        }
    }
}

// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
    l := len(rec)
    part := rec[l-1]
    if p, ok := partitions[part]; ok {
        partitions[part] = append(p, rec)
    } else {
        partitions[part] = [][]string{rec}
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/--iqZGzxCF

和并发版本:

package main

import (
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strings"
    "sync"
)

var (
    // list of channels to communicate with workers
    // workers accessed synchronousely no mutex required
    workers = make(map[string]chan []string)

    // wg is to make sure all workers done before exiting main
    wg = sync.WaitGroup{}

    // mu used only for sequential printing, not relevant for program logic
    mu = sync.Mutex{}
)

func main() {

    // wait for all workers to finish up before exit
    defer wg.Wait()

    r := csv.NewReader(csvFile1)

    for {
        rec, err := r.Read()
        if err != nil {
            if err == io.EOF {
                savePartitions()
                return
            }
            log.Fatal(err) // sorry for the panic
        }
        process(rec)
    }

}

func process(rec []string) {
    l := len(rec)
    part := rec[l-1]

    if c, ok := workers[part]; ok {
        // send rec to worker
        c <- rec
    } else {
        // if no worker for the partition

        // make a chan
        nc := make(chan []string)
        workers[part] = nc

        // start worker with this chan
        go worker(nc)

        // send rec to worker via chan
        nc <- rec
    }
}

func worker(c chan []string) {

    // wg.Done signals to main worker completion
    wg.Add(1)
    defer wg.Done()

    part := [][]string{}
    for {
        // wait for a rec or close(chan)
        rec, ok := <-c
        if ok {
            // save the rec
            // instead of accumulation in memory
            // this can be saved to file directly
            part = append(part, rec)
        } else {
            // channel closed on EOF

            // dump partition
            // locks ensures sequential printing
            // not a required for independent files
            mu.Lock()
            for _, p := range part {
                fmt.Printf("%+v\n", p)
            }
            mu.Unlock()

            return
        }
    }
}

// simply signals to workers to stop
func savePartitions() {
    for _, c := range workers {
        // signal to all workers to exit
        close(c)
    }
}

// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)

https://play.golang.org/p/oBTPosy0yT

玩得开心!