同时从一个文件写入多个 csv 文件,在 Golang 的分区列上拆分
Concurrently write multiple csv files from one, splitting on a partition column in Golang
我的objective是读取一个或多个共享通用格式的csv文件,并根据csv数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定的分区。一个文件的示例:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
如果这种方法闻起来像可怕的 XY 问题,我很乐意进行调整。
到目前为止我尝试过的:
- 读入数据集并遍历每一行
- 如果分区有
已经看到,衍生出一个新的工作例程(这将包含一个 file/csv
作家)。将行发送到
chan []string
.
- 由于每个 worker 都是一个文件编写器,它应该只能通过其输入通道接收恰好一个分区的行。
这显然行不通(目前),因为我不知道如何根据在给定行上看到的分区值将行发送给正确的工作人员。
我已经为每个分区值给每个工作人员一个 id string
,但我不知道如何 select 那个工作人员要发送给,如果我应该创建一个单独的 chan []string
对于每个工作人员并使用 select
发送到该频道,或者如果一个结构应该为每个工作人员提供某种池和路由功能。
TLDR;我不知道如何根据某些分类 string
值有条件地将数据发送到给定的 go 例程或通道,其中唯一的数量可以是任意的,但可能不超过 24 个唯一的分区值。
我会警告说我注意到像这样的问题确实被否决了,所以如果你觉得这是反建设性的或不完整的足以否决投票,请评论为什么这样我可以避免重蹈覆辙
提前感谢您的帮助!
片段:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
同步版本没有先去并发魔术(见下面的并发版本)。
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
partitions := make(map[string][][]string)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
err = nil
save_partitions(partitions)
return
}
log.Fatal(err)
}
process(rec, partitions)
}
}
// prints only
func save_partitions(partitions map[string][][]string) {
for part, recs := range partitions {
fmt.Println(part)
for _, rec := range recs {
fmt.Println(rec)
}
}
}
// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
l := len(rec)
part := rec[l-1]
if p, ok := partitions[part]; ok {
partitions[part] = append(p, rec)
} else {
partitions[part] = [][]string{rec}
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/--iqZGzxCF
和并发版本:
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
"sync"
)
var (
// list of channels to communicate with workers
// workers accessed synchronousely no mutex required
workers = make(map[string]chan []string)
// wg is to make sure all workers done before exiting main
wg = sync.WaitGroup{}
// mu used only for sequential printing, not relevant for program logic
mu = sync.Mutex{}
)
func main() {
// wait for all workers to finish up before exit
defer wg.Wait()
r := csv.NewReader(csvFile1)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
savePartitions()
return
}
log.Fatal(err) // sorry for the panic
}
process(rec)
}
}
func process(rec []string) {
l := len(rec)
part := rec[l-1]
if c, ok := workers[part]; ok {
// send rec to worker
c <- rec
} else {
// if no worker for the partition
// make a chan
nc := make(chan []string)
workers[part] = nc
// start worker with this chan
go worker(nc)
// send rec to worker via chan
nc <- rec
}
}
func worker(c chan []string) {
// wg.Done signals to main worker completion
wg.Add(1)
defer wg.Done()
part := [][]string{}
for {
// wait for a rec or close(chan)
rec, ok := <-c
if ok {
// save the rec
// instead of accumulation in memory
// this can be saved to file directly
part = append(part, rec)
} else {
// channel closed on EOF
// dump partition
// locks ensures sequential printing
// not a required for independent files
mu.Lock()
for _, p := range part {
fmt.Printf("%+v\n", p)
}
mu.Unlock()
return
}
}
}
// simply signals to workers to stop
func savePartitions() {
for _, c := range workers {
// signal to all workers to exit
close(c)
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/oBTPosy0yT
玩得开心!
我的objective是读取一个或多个共享通用格式的csv文件,并根据csv数据中的分区列写入单独的文件。请允许最后一列是分区,数据未排序,并且可以在多个文件中找到给定的分区。一个文件的示例:
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,04
22df9,abc,def,2017,11,06,03
1d243,abc,def,2017,11,06,02
如果这种方法闻起来像可怕的 XY 问题,我很乐意进行调整。
到目前为止我尝试过的:
- 读入数据集并遍历每一行
- 如果分区有
已经看到,衍生出一个新的工作例程(这将包含一个 file/csv
作家)。将行发送到
chan []string
. - 由于每个 worker 都是一个文件编写器,它应该只能通过其输入通道接收恰好一个分区的行。
这显然行不通(目前),因为我不知道如何根据在给定行上看到的分区值将行发送给正确的工作人员。
我已经为每个分区值给每个工作人员一个 id string
,但我不知道如何 select 那个工作人员要发送给,如果我应该创建一个单独的 chan []string
对于每个工作人员并使用 select
发送到该频道,或者如果一个结构应该为每个工作人员提供某种池和路由功能。
TLDR;我不知道如何根据某些分类 string
值有条件地将数据发送到给定的 go 例程或通道,其中唯一的数量可以是任意的,但可能不超过 24 个唯一的分区值。
我会警告说我注意到像这样的问题确实被否决了,所以如果你觉得这是反建设性的或不完整的足以否决投票,请评论为什么这样我可以避免重蹈覆辙
提前感谢您的帮助!
片段:
package main
import (
"encoding/csv"
"fmt"
"log"
"strings"
"time"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
lines, err := r.ReadAll()
if err != nil {
log.Fatalf("error reading all lines: %v", err)
}
// CHANNELS
lineChan := make(chan []string)
// TRACKER
var seenPartitions []string
for _, line := range lines {
hour := line[6]
if !stringInSlice(hour, seenPartitions) {
seenPartitions = append(seenPartitions, hour)
go worker(hour, lineChan)
}
// How to send to the correct worker/channel?
lineChan <- line
}
close(lineChan)
}
func worker(id string, lineChan <-chan []string) {
for j := range lineChan {
fmt.Println("worker", id, "started job", j)
// Write to a new file here and wait for input over the channel
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
}
}
func stringInSlice(str string, list []string) bool {
for _, v := range list {
if v == str {
return true
}
}
return false
}
// DUMMY
var csvFile1 = strings.NewReader(`
12fy3,abc,def,2017,11,06,04
fsdio,abc,def,2017,11,06,01
11213,abc,def,2017,11,06,02
1sdf9,abc,def,2017,11,06,01
2123r,abc,def,2017,11,06,03
1v2t3,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1d243,abc,def,2017,11,06,01
1da23,abc,def,2017,11,06,04
a1523,abc,def,2017,11,06,01
12453,abc,def,2017,11,06,04`)
同步版本没有先去并发魔术(见下面的并发版本)。
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
)
func main() {
// CSV
r := csv.NewReader(csvFile1)
partitions := make(map[string][][]string)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
err = nil
save_partitions(partitions)
return
}
log.Fatal(err)
}
process(rec, partitions)
}
}
// prints only
func save_partitions(partitions map[string][][]string) {
for part, recs := range partitions {
fmt.Println(part)
for _, rec := range recs {
fmt.Println(rec)
}
}
}
// this can also write/append directly to a file
func process(rec []string, partitions map[string][][]string) {
l := len(rec)
part := rec[l-1]
if p, ok := partitions[part]; ok {
partitions[part] = append(p, rec)
} else {
partitions[part] = [][]string{rec}
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/--iqZGzxCF
和并发版本:
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"strings"
"sync"
)
var (
// list of channels to communicate with workers
// workers accessed synchronousely no mutex required
workers = make(map[string]chan []string)
// wg is to make sure all workers done before exiting main
wg = sync.WaitGroup{}
// mu used only for sequential printing, not relevant for program logic
mu = sync.Mutex{}
)
func main() {
// wait for all workers to finish up before exit
defer wg.Wait()
r := csv.NewReader(csvFile1)
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
savePartitions()
return
}
log.Fatal(err) // sorry for the panic
}
process(rec)
}
}
func process(rec []string) {
l := len(rec)
part := rec[l-1]
if c, ok := workers[part]; ok {
// send rec to worker
c <- rec
} else {
// if no worker for the partition
// make a chan
nc := make(chan []string)
workers[part] = nc
// start worker with this chan
go worker(nc)
// send rec to worker via chan
nc <- rec
}
}
func worker(c chan []string) {
// wg.Done signals to main worker completion
wg.Add(1)
defer wg.Done()
part := [][]string{}
for {
// wait for a rec or close(chan)
rec, ok := <-c
if ok {
// save the rec
// instead of accumulation in memory
// this can be saved to file directly
part = append(part, rec)
} else {
// channel closed on EOF
// dump partition
// locks ensures sequential printing
// not a required for independent files
mu.Lock()
for _, p := range part {
fmt.Printf("%+v\n", p)
}
mu.Unlock()
return
}
}
}
// simply signals to workers to stop
func savePartitions() {
for _, c := range workers {
// signal to all workers to exit
close(c)
}
}
// DUMMY
var csvFile1 = strings.NewReader(`
fsdio,abc,def,2017,11,06,01
1sdf9,abc,def,2017,11,06,01
1d243,abc,def,2017,11,06,01
1v2t3,abc,def,2017,11,06,01
a1523,abc,def,2017,11,06,01
1r2r3,abc,def,2017,11,06,02
11213,abc,def,2017,11,06,02
g1253,abc,def,2017,11,06,02
d1e23,abc,def,2017,11,06,02
a1d23,abc,def,2017,11,06,02
12jj3,abc,def,2017,11,06,03
t1r23,abc,def,2017,11,06,03
2123r,abc,def,2017,11,06,03
22123,abc,def,2017,11,06,03
14d23,abc,def,2017,11,06,04
1da23,abc,def,2017,11,06,04
12fy3,abc,def,2017,11,06,04
12453,abc,def,2017,11,06,04`)
https://play.golang.org/p/oBTPosy0yT
玩得开心!