对通道使用 rate.NewLimiter 速率限制器
Using rate.NewLimiter rate limiter with channels
我正在使用速率限制器来限制路由的请求数量
请求被发送到一个频道,我想限制每秒处理的数量,但我很难理解我是否设置正确,我没有收到错误,但是 我不确定我是否在使用限速器
这是添加到频道的内容:
type processItem struct {
itemString string
}
这是通道和限制器:
itemChannel := make(chan processItem, 5)
itemThrottler := rate.NewLimiter(4, 1) //4 a second, no more per second (1)
var waitGroup sync.WaitGroup
项目已添加到频道:
case "newItem":
waitGroup.Add(1)
itemToExec := new(processItem)
itemToExec.itemString = "item string"
itemChannel <- *itemToExec
然后使用go例程来处理添加到频道的所有内容:
go func() {
defer waitGroup.Done()
err := itemThrottler.Wait(context.Background())
if err != nil {
fmt.Printf("Error with limiter: %s", err)
return
}
for item := range itemChannel {
execItem(item.itemString) // the processing function
}
defer func() { <-itemChannel }()
}()
waitGroup.Wait()
谁能确认发生了以下情况:
- execItem 函数在频道的每个成员上 运行 每秒 4 次
我不明白“err := itemThrottler.Wait(context.Background())”在代码中的作用,它是如何被调用的?
... i'm unsure if i'm even using the rate limiter
是的,您正在使用限速器。您正在对代码的 case "newItem":
分支进行速率限制。
I don't understand what "err := itemThrottler.Wait(context.Background())" is doing in the code
itemThrottler.Wait(..)
只会错开请求(4/秒,即每 0.25 秒)- 如果超过速率,它不会 拒绝 请求。那么这是什么意思?如果您在 1 秒内收到过多的 1000 个请求:
- 4个请求将立即处理;但是
- 996 个请求将产生 996 个将阻塞的 go-routines 的积压
996 将以 4/s 的速率解除阻塞,因此待处理 go-routines 的积压将不会再清除 4 分钟(如果有更多请求进入,可能会更长)。积压的 go-routines 可能是你想要的,也可能不是你想要的。如果不是,您可能需要使用 Limiter.Allow - and if false
is returned, then refuse the request (i.e. don't create a go-routine) and return a 429 错误(如果这是一个 HTTP 请求)。
最后,如果这是一个 HTTP 请求,您应该在调用 Wait
时使用它的嵌入式上下文,例如
func (a *app) myHandler(w http.ResponseWriter, r *http.Request) {
// ...
err := a.ratelimiter(r.Context())
if err != nil {
// client http request most likely canceled (i.e. caller disconnected)
}
}
我正在使用速率限制器来限制路由的请求数量
请求被发送到一个频道,我想限制每秒处理的数量,但我很难理解我是否设置正确,我没有收到错误,但是 我不确定我是否在使用限速器
这是添加到频道的内容:
type processItem struct {
itemString string
}
这是通道和限制器:
itemChannel := make(chan processItem, 5)
itemThrottler := rate.NewLimiter(4, 1) //4 a second, no more per second (1)
var waitGroup sync.WaitGroup
项目已添加到频道:
case "newItem":
waitGroup.Add(1)
itemToExec := new(processItem)
itemToExec.itemString = "item string"
itemChannel <- *itemToExec
然后使用go例程来处理添加到频道的所有内容:
go func() {
defer waitGroup.Done()
err := itemThrottler.Wait(context.Background())
if err != nil {
fmt.Printf("Error with limiter: %s", err)
return
}
for item := range itemChannel {
execItem(item.itemString) // the processing function
}
defer func() { <-itemChannel }()
}()
waitGroup.Wait()
谁能确认发生了以下情况:
- execItem 函数在频道的每个成员上 运行 每秒 4 次
我不明白“err := itemThrottler.Wait(context.Background())”在代码中的作用,它是如何被调用的?
... i'm unsure if i'm even using the rate limiter
是的,您正在使用限速器。您正在对代码的 case "newItem":
分支进行速率限制。
I don't understand what "err := itemThrottler.Wait(context.Background())" is doing in the code
itemThrottler.Wait(..)
只会错开请求(4/秒,即每 0.25 秒)- 如果超过速率,它不会 拒绝 请求。那么这是什么意思?如果您在 1 秒内收到过多的 1000 个请求:
- 4个请求将立即处理;但是
- 996 个请求将产生 996 个将阻塞的 go-routines 的积压
996 将以 4/s 的速率解除阻塞,因此待处理 go-routines 的积压将不会再清除 4 分钟(如果有更多请求进入,可能会更长)。积压的 go-routines 可能是你想要的,也可能不是你想要的。如果不是,您可能需要使用 Limiter.Allow - and if false
is returned, then refuse the request (i.e. don't create a go-routine) and return a 429 错误(如果这是一个 HTTP 请求)。
最后,如果这是一个 HTTP 请求,您应该在调用 Wait
时使用它的嵌入式上下文,例如
func (a *app) myHandler(w http.ResponseWriter, r *http.Request) {
// ...
err := a.ratelimiter(r.Context())
if err != nil {
// client http request most likely canceled (i.e. caller disconnected)
}
}