通过缓冲通道 (Golang) 限制并发执行进程的数量
Throttle number of concurrent executing processes via buffered channels (Golang)
意图:
我正在寻找一种 运行 os 级 shell 并行命令的方法,但要注意不要破坏 CPU,我想知道是否缓冲通道适合这种用例。
已实施:
创建一系列具有模拟 运行 持续时间的 Job
。将这些作业发送到一个队列,该队列将 dispatch
通过缓冲通道将它们 run
发送到 EXEC_THROTTLE
。
观察:
这个 'works'(在编译和 运行s 的范围内),但我想知道缓冲区是否按规定工作(参见:'Intent')以限制并行进程数 运行ning。
免责声明:
现在,我知道新手往往会过度使用频道,但我觉得这种洞察力的请求是诚实的,因为我至少已经克制地使用了 sync.WaitGroup
。原谅有点玩具的例子,但所有的见解将不胜感激。
package main
import (
// "os/exec"
"log"
"math/rand"
"strconv"
"sync"
"time"
)
const (
EXEC_THROTTLE = 2
)
type JobsManifest []Job
type Job struct {
cmd string
result string
runtime int // Simulate long-running task
}
func (j JobsManifest) queueJobs(logChan chan<- string, runChan chan Job, wg *sync.WaitGroup) {
go dispatch(logChan, runChan)
for _, job := range j {
wg.Add(1)
runChan <- job
}
}
func dispatch(logChan chan<- string, runChan chan Job) {
for j := range runChan {
go run(j, logChan)
}
}
func run(j Job, logChan chan<- string) {
time.Sleep(time.Second * time.Duration(j.runtime))
j.result = strconv.Itoa(rand.Intn(10)) // j.result = os.Exec("/bin/bash", "-c", j.cmd).Output()
logChan <- j.result
log.Printf(" ran: %s\n", j.cmd)
}
func logger(logChan <-chan string, wg *sync.WaitGroup) {
for {
res := <-logChan
log.Printf("logged: %s\n", res)
wg.Done()
}
}
func main() {
jobs := []Job{
Job{
cmd: "ps -p $(pgrep vim) | tail -n 1 | awk '{print }'",
runtime: 1,
},
Job{
cmd: "wc -l /var/log/foo.log | awk '{print }'",
runtime: 2,
},
Job{
cmd: "ls -l ~/go/src/github.com/ | wc -l | awk '{print }'",
runtime: 3,
},
Job{
cmd: "find /var/log/ -regextype posix-extended -regex '.*[0-9]{10}'",
runtime: 4,
},
}
var wg sync.WaitGroup
logChan := make(chan string)
runChan := make(chan Job, EXEC_THROTTLE)
go logger(logChan, &wg)
start := time.Now()
JobsManifest(jobs).queueJobs(logChan, runChan, &wg)
wg.Wait()
log.Printf("finish: %s\n", time.Since(start))
}
如果我没理解错的话,你的意思是建立一个机制来确保在任何时候最多有 EXEC_THROTTLE
个作业是 运行。如果那是您的意图,则代码不起作用。
这是因为当你开始一个工作时,你已经消耗了通道 - 允许开始另一个工作,但没有工作完成。您可以通过添加一个计数器来调试它(您需要原子添加或互斥锁)。
您可以通过简单地启动一组带有无缓冲通道的 goroutine 并在执行作业时阻塞来完成这项工作:
func Run(j Job) r Result {
//Run your job here
}
func Dispatch(ch chan Job) {
for j:=range ch {
wg.Add(1)
Run(j)
wg.Done()
}
}
func main() {
ch := make(chan Job)
for i:=0; i<EXEC_THROTTLE; i++ {
go Dispatch(ch)
}
//call dispatch according to the queue here.
}
之所以有效,是因为只要有一个 goroutine 在使用通道,就意味着至少有一个 goroutine 不是 运行 并且最多有 EXEC_THROTTLE-1
个作业 运行 所以它是很高兴再执行一次,它确实如此。
我经常用这个。 https://github.com/dustinevan/go-utils
package async
import (
"context"
"github.com/pkg/errors"
)
type Semaphore struct {
buf chan struct{}
ctx context.Context
cancel context.CancelFunc
}
func NewSemaphore(max int, parentCtx context.Context) *Semaphore {
s := &Semaphore{
buf: make(chan struct{}, max),
ctx: parentCtx,
}
go func() {
<-s.ctx.Done()
close(s.buf)
drainStruct(s.buf)
}()
return s
}
var CLOSED = errors.New("the semaphore has been closed")
func (s *Semaphore) Acquire() error {
select {
case <-s.ctx.Done():
return CLOSED
case s.buf <- struct{}{}:
return nil
}
}
func (s *Semaphore) Release() {
<-s.buf
}
你会像这样使用它:
func main() {
sem := async.NewSemaphore(10, context.Background())
...
var wg sync.Waitgroup
for _, job := range jobs {
go func() {
wg.Add(1)
err := sem.Acquire()
if err != nil {
// handle err,
}
defer sem.Release()
defer wg.Done()
job()
}
wg.Wait()
}
您还可以使用缓冲通道来限制并发数:
concurrencyLimit := 2 // Number of simultaneous jobs.
semaphore := make(chan struct{}, concurrencyLimit)
for job := range jobs {
job := job // Pin loop variable.
semaphore <- struct{}{} // Acquire semaphore slot.
go func() {
defer func() {
<-semaphore // Release semaphore slot.
}()
do(job) // Do the job.
}()
}
// Wait for goroutines to finish by acquiring all slots.
for i := 0; i < cap(semaphore); i++ {
semaphore <- struct{}{}
}
将 processItem 函数替换为需要执行的作业。
以下将按正确顺序执行作业。最多 EXEC_CONCURRENT 项将同时执行。
package main
import (
"fmt"
"sync"
"time"
)
func processItem(i int, done chan int, wg *sync.WaitGroup) {
fmt.Printf("Async Start: %d\n", i)
time.Sleep(100 * time.Millisecond * time.Duration(i))
fmt.Printf("Async Complete: %d\n", i)
done <- 1
wg.Done()
}
func popItemFromBufferChannelWhenItemDoneExecuting(items chan int, done chan int) {
_ = <- done
_ = <-items
}
func main() {
EXEC_CONCURRENT := 3
items := make(chan int, EXEC_CONCURRENT)
done := make(chan int)
var wg sync.WaitGroup
for i:= 1; i < 11; i++ {
items <- i
wg.Add(1)
go processItem(i, done, &wg)
go popItemFromBufferChannelWhenItemDoneExecuting(items, done)
}
wg.Wait()
}
下面将以随机顺序执行作业。最多 EXEC_CONCURRENT 项将同时执行。
package main
import (
"fmt"
"sync"
"time"
)
func processItem(i int, items chan int, wg *sync.WaitGroup) {
items <- i
fmt.Printf("Async Start: %d\n", i)
time.Sleep(100 * time.Millisecond * time.Duration(i))
fmt.Printf("Async Complete: %d\n", i)
_ = <- items
wg.Done()
}
func main() {
EXEC_CONCURRENT := 3
items := make(chan int, EXEC_CONCURRENT)
var wg sync.WaitGroup
for i:= 1; i < 11; i++ {
wg.Add(1)
go processItem(i, items, &wg)
}
wg.Wait()
}
您可以根据自己的需求选择。
意图:
我正在寻找一种 运行 os 级 shell 并行命令的方法,但要注意不要破坏 CPU,我想知道是否缓冲通道适合这种用例。
已实施:
创建一系列具有模拟 运行 持续时间的 Job
。将这些作业发送到一个队列,该队列将 dispatch
通过缓冲通道将它们 run
发送到 EXEC_THROTTLE
。
观察:
这个 'works'(在编译和 运行s 的范围内),但我想知道缓冲区是否按规定工作(参见:'Intent')以限制并行进程数 运行ning。
免责声明:
现在,我知道新手往往会过度使用频道,但我觉得这种洞察力的请求是诚实的,因为我至少已经克制地使用了 sync.WaitGroup
。原谅有点玩具的例子,但所有的见解将不胜感激。
package main
import (
// "os/exec"
"log"
"math/rand"
"strconv"
"sync"
"time"
)
const (
EXEC_THROTTLE = 2
)
type JobsManifest []Job
type Job struct {
cmd string
result string
runtime int // Simulate long-running task
}
func (j JobsManifest) queueJobs(logChan chan<- string, runChan chan Job, wg *sync.WaitGroup) {
go dispatch(logChan, runChan)
for _, job := range j {
wg.Add(1)
runChan <- job
}
}
func dispatch(logChan chan<- string, runChan chan Job) {
for j := range runChan {
go run(j, logChan)
}
}
func run(j Job, logChan chan<- string) {
time.Sleep(time.Second * time.Duration(j.runtime))
j.result = strconv.Itoa(rand.Intn(10)) // j.result = os.Exec("/bin/bash", "-c", j.cmd).Output()
logChan <- j.result
log.Printf(" ran: %s\n", j.cmd)
}
func logger(logChan <-chan string, wg *sync.WaitGroup) {
for {
res := <-logChan
log.Printf("logged: %s\n", res)
wg.Done()
}
}
func main() {
jobs := []Job{
Job{
cmd: "ps -p $(pgrep vim) | tail -n 1 | awk '{print }'",
runtime: 1,
},
Job{
cmd: "wc -l /var/log/foo.log | awk '{print }'",
runtime: 2,
},
Job{
cmd: "ls -l ~/go/src/github.com/ | wc -l | awk '{print }'",
runtime: 3,
},
Job{
cmd: "find /var/log/ -regextype posix-extended -regex '.*[0-9]{10}'",
runtime: 4,
},
}
var wg sync.WaitGroup
logChan := make(chan string)
runChan := make(chan Job, EXEC_THROTTLE)
go logger(logChan, &wg)
start := time.Now()
JobsManifest(jobs).queueJobs(logChan, runChan, &wg)
wg.Wait()
log.Printf("finish: %s\n", time.Since(start))
}
如果我没理解错的话,你的意思是建立一个机制来确保在任何时候最多有 EXEC_THROTTLE
个作业是 运行。如果那是您的意图,则代码不起作用。
这是因为当你开始一个工作时,你已经消耗了通道 - 允许开始另一个工作,但没有工作完成。您可以通过添加一个计数器来调试它(您需要原子添加或互斥锁)。
您可以通过简单地启动一组带有无缓冲通道的 goroutine 并在执行作业时阻塞来完成这项工作:
func Run(j Job) r Result {
//Run your job here
}
func Dispatch(ch chan Job) {
for j:=range ch {
wg.Add(1)
Run(j)
wg.Done()
}
}
func main() {
ch := make(chan Job)
for i:=0; i<EXEC_THROTTLE; i++ {
go Dispatch(ch)
}
//call dispatch according to the queue here.
}
之所以有效,是因为只要有一个 goroutine 在使用通道,就意味着至少有一个 goroutine 不是 运行 并且最多有 EXEC_THROTTLE-1
个作业 运行 所以它是很高兴再执行一次,它确实如此。
我经常用这个。 https://github.com/dustinevan/go-utils
package async
import (
"context"
"github.com/pkg/errors"
)
type Semaphore struct {
buf chan struct{}
ctx context.Context
cancel context.CancelFunc
}
func NewSemaphore(max int, parentCtx context.Context) *Semaphore {
s := &Semaphore{
buf: make(chan struct{}, max),
ctx: parentCtx,
}
go func() {
<-s.ctx.Done()
close(s.buf)
drainStruct(s.buf)
}()
return s
}
var CLOSED = errors.New("the semaphore has been closed")
func (s *Semaphore) Acquire() error {
select {
case <-s.ctx.Done():
return CLOSED
case s.buf <- struct{}{}:
return nil
}
}
func (s *Semaphore) Release() {
<-s.buf
}
你会像这样使用它:
func main() {
sem := async.NewSemaphore(10, context.Background())
...
var wg sync.Waitgroup
for _, job := range jobs {
go func() {
wg.Add(1)
err := sem.Acquire()
if err != nil {
// handle err,
}
defer sem.Release()
defer wg.Done()
job()
}
wg.Wait()
}
您还可以使用缓冲通道来限制并发数:
concurrencyLimit := 2 // Number of simultaneous jobs.
semaphore := make(chan struct{}, concurrencyLimit)
for job := range jobs {
job := job // Pin loop variable.
semaphore <- struct{}{} // Acquire semaphore slot.
go func() {
defer func() {
<-semaphore // Release semaphore slot.
}()
do(job) // Do the job.
}()
}
// Wait for goroutines to finish by acquiring all slots.
for i := 0; i < cap(semaphore); i++ {
semaphore <- struct{}{}
}
将 processItem 函数替换为需要执行的作业。
以下将按正确顺序执行作业。最多 EXEC_CONCURRENT 项将同时执行。
package main
import (
"fmt"
"sync"
"time"
)
func processItem(i int, done chan int, wg *sync.WaitGroup) {
fmt.Printf("Async Start: %d\n", i)
time.Sleep(100 * time.Millisecond * time.Duration(i))
fmt.Printf("Async Complete: %d\n", i)
done <- 1
wg.Done()
}
func popItemFromBufferChannelWhenItemDoneExecuting(items chan int, done chan int) {
_ = <- done
_ = <-items
}
func main() {
EXEC_CONCURRENT := 3
items := make(chan int, EXEC_CONCURRENT)
done := make(chan int)
var wg sync.WaitGroup
for i:= 1; i < 11; i++ {
items <- i
wg.Add(1)
go processItem(i, done, &wg)
go popItemFromBufferChannelWhenItemDoneExecuting(items, done)
}
wg.Wait()
}
下面将以随机顺序执行作业。最多 EXEC_CONCURRENT 项将同时执行。
package main
import (
"fmt"
"sync"
"time"
)
func processItem(i int, items chan int, wg *sync.WaitGroup) {
items <- i
fmt.Printf("Async Start: %d\n", i)
time.Sleep(100 * time.Millisecond * time.Duration(i))
fmt.Printf("Async Complete: %d\n", i)
_ = <- items
wg.Done()
}
func main() {
EXEC_CONCURRENT := 3
items := make(chan int, EXEC_CONCURRENT)
var wg sync.WaitGroup
for i:= 1; i < 11; i++ {
wg.Add(1)
go processItem(i, items, &wg)
}
wg.Wait()
}
您可以根据自己的需求选择。