在顺序执行之前等待通道中的 N 个项目
Wait for N items in channel before executing sequentially
所以我很新去!但是我对我想尝试的事情有了这个想法。
我想要一个从通道接受字符串的 go 例程,但只有在它收到 N 个字符串后才应该对它们执行。
我四处寻找类似的问题或案例,但我只发现了一些想法是并行执行多个例程并等待汇总结果。
我考虑过创建数组并将其传递给长度足够的例程的想法。但是我想保持一定的关注点分离并在接收端控制它。
我的问题是。
- 出于某种原因,这是一种不好的做法吗?
有没有更好的方法,是什么?
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
for {
tasks = append(tasks,<- ch)
if len(tasks) < 3 {
fmt.Println("Queue still to small")
}
if len(tasks) > 3 {
for i := 0; i < len(tasks); i++ {
fmt.Println(tasks[i])
}
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
编辑更简单更准确的示例。
根据一些评论,您正在寻找的似乎是某种形式的批处理。
批处理有几种情况,您可能希望将批处理一起发送:
- 批量大小足够
- 已经过了足够的时间,应冲洗部分批次
您给出的示例没有考虑第二种情况。如果您只是因为停止加载而从不冲洗,这可能会导致一些尴尬的行为。
因此,我建议要么查看一个库(例如,cloudfoundry/go-batching),要么简单地使用频道、一个计时器和一个 select 语句。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
for {
select {
case <-timer.C:
fmt.Println("Flush partial batch due to time")
flush(tasks)
tasks = nil
timer.Reset(time.Second)
case data := <-ch:
tasks = append(tasks, data)
// Reset the timer for each data point so that we only flush
// partial batches when we stop receiving data.
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second)
// Guard clause to for batch size
if len(tasks) < 3 {
fmt.Println("Queue still too small")
continue
}
flush(tasks)
tasks = nil // reset tasks
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
func flush(tasks []string) {
// Guard against emtpy flushes
if len(tasks) == 0 {
return
}
fmt.Println("Flush")
for _, t := range tasks {
fmt.Println(t)
}
}
我明白了批处理结果的用处。但它确实需要定制解决方案。有很多方法可以解决这个问题——我尝试使用 Sync.WaitGroup
但它变得很乱。似乎使用 sync.Mutex
来锁定批处理功能是最好的方法。但是,当 mutex 是最好的答案时,imo 应该触发对设计的重新检查,因为 imo,它应该是最后一个选项。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
最好只有一个切片来累积字符串,当该切片达到一定大小时,然后开始一些处理。
所以我很新去!但是我对我想尝试的事情有了这个想法。
我想要一个从通道接受字符串的 go 例程,但只有在它收到 N 个字符串后才应该对它们执行。
我四处寻找类似的问题或案例,但我只发现了一些想法是并行执行多个例程并等待汇总结果。
我考虑过创建数组并将其传递给长度足够的例程的想法。但是我想保持一定的关注点分离并在接收端控制它。
我的问题是。
- 出于某种原因,这是一种不好的做法吗?
有没有更好的方法,是什么?
func main() { ch := make(chan string) go func() { tasks := []string{} for { tasks = append(tasks,<- ch) if len(tasks) < 3 { fmt.Println("Queue still to small") } if len(tasks) > 3 { for i := 0; i < len(tasks); i++ { fmt.Println(tasks[i]) } } } }() ch <- "Msg 1" time.Sleep(time.Second) ch <- "Msg 2" time.Sleep(time.Second) ch <- "Msg 3" time.Sleep(time.Second) ch <- "Msg 4" time.Sleep(time.Second) }
编辑更简单更准确的示例。
根据一些评论,您正在寻找的似乎是某种形式的批处理。
批处理有几种情况,您可能希望将批处理一起发送:
- 批量大小足够
- 已经过了足够的时间,应冲洗部分批次
您给出的示例没有考虑第二种情况。如果您只是因为停止加载而从不冲洗,这可能会导致一些尴尬的行为。
因此,我建议要么查看一个库(例如,cloudfoundry/go-batching),要么简单地使用频道、一个计时器和一个 select 语句。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
for {
select {
case <-timer.C:
fmt.Println("Flush partial batch due to time")
flush(tasks)
tasks = nil
timer.Reset(time.Second)
case data := <-ch:
tasks = append(tasks, data)
// Reset the timer for each data point so that we only flush
// partial batches when we stop receiving data.
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second)
// Guard clause to for batch size
if len(tasks) < 3 {
fmt.Println("Queue still too small")
continue
}
flush(tasks)
tasks = nil // reset tasks
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
func flush(tasks []string) {
// Guard against emtpy flushes
if len(tasks) == 0 {
return
}
fmt.Println("Flush")
for _, t := range tasks {
fmt.Println(t)
}
}
我明白了批处理结果的用处。但它确实需要定制解决方案。有很多方法可以解决这个问题——我尝试使用 Sync.WaitGroup
但它变得很乱。似乎使用 sync.Mutex
来锁定批处理功能是最好的方法。但是,当 mutex 是最好的答案时,imo 应该触发对设计的重新检查,因为 imo,它应该是最后一个选项。
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
最好只有一个切片来累积字符串,当该切片达到一定大小时,然后开始一些处理。