限制来自通道的已处理消息数
Limit the number of processed messages from channel
我通过渠道收到大约 200 000 message/seconds 给我的工作人员,我需要将发送给客户端的消息数量限制为每秒 20 条。
这使得它每 50 毫秒 1 条消息
并且在 LOOP 的帮助下,worker 在整个程序生命周期内仍然存活(并且不会为每条消息打开一个通道)。
我的目标:
- 由于消息的顺序很重要,我想跳过被阻塞的 50 毫秒内出现的所有消息,只保存最新的一条
- 如果最新消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <-- 这是我的问题
我的攻略
- 继续发送最新的尚未处理的消息到同一个频道
但它的问题是,如果该消息是在(来自应用程序的)新消息之后发送的怎么办?
下面的代码更像是一个算法作为工作代码,只需要 tip/way 如何实现。
func example (new_message_from_channel <-chan *message) {
default = message
time = now_milliseconds
diff_accepted = 50milli
for this_message := range new_message_from_channel {
if now_millisecond - time >= diff_accepted {
send_it_to_the_client
time = now_milliseconds
} else {
//save the latest message
default = this_message
//My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!
//My strategy - keep sending it to the same channel
theChannel <- default
}
}
}
如果你有好的方法,欢迎分享给我:)
使用速率限制器,您可以创建一个 throttle
函数,它将:一个速率和一个通道作为输入;和 return 两个频道 - 一个包含所有原始频道项目,另一个仅以固定速率中继项目:
func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {
// "writeable" channels
var (
wC = make(chan event)
wtC = make(chan event)
)
// read-only channels - returned to caller
C = wC
tC = wtC
go func() {
defer close(wC)
defer close(wtC)
rl := rate.NewLimiter(
rate.Every(r),
1,
)
// relays input channel's items to two channels:
// (1) gets all writes from original channel
// (2) only writes at a fixed frequency
for ev := range in {
wC <- ev
if rl.Allow() {
wtC <- ev
}
}
}()
return
}
工作示例:https://play.golang.org/p/upei0TiyzNr
编辑:
避免使用速率限制器,而是使用简单的 time.Ticker
:
tick := time.NewTicker(r)
for ev := range in {
select {
case wC <- ev: // write to main
case <-tick.C:
wC <- ev // write to main ...
wtC <- ev // ... plus throttle channel
}
}
我通过渠道收到大约 200 000 message/seconds 给我的工作人员,我需要将发送给客户端的消息数量限制为每秒 20 条。 这使得它每 50 毫秒 1 条消息
并且在 LOOP 的帮助下,worker 在整个程序生命周期内仍然存活(并且不会为每条消息打开一个通道)。
我的目标: - 由于消息的顺序很重要,我想跳过被阻塞的 50 毫秒内出现的所有消息,只保存最新的一条 - 如果最新消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <-- 这是我的问题
我的攻略 - 继续发送最新的尚未处理的消息到同一个频道
但它的问题是,如果该消息是在(来自应用程序的)新消息之后发送的怎么办?
下面的代码更像是一个算法作为工作代码,只需要 tip/way 如何实现。
func example (new_message_from_channel <-chan *message) {
default = message
time = now_milliseconds
diff_accepted = 50milli
for this_message := range new_message_from_channel {
if now_millisecond - time >= diff_accepted {
send_it_to_the_client
time = now_milliseconds
} else {
//save the latest message
default = this_message
//My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!
//My strategy - keep sending it to the same channel
theChannel <- default
}
}
}
如果你有好的方法,欢迎分享给我:)
使用速率限制器,您可以创建一个 throttle
函数,它将:一个速率和一个通道作为输入;和 return 两个频道 - 一个包含所有原始频道项目,另一个仅以固定速率中继项目:
func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {
// "writeable" channels
var (
wC = make(chan event)
wtC = make(chan event)
)
// read-only channels - returned to caller
C = wC
tC = wtC
go func() {
defer close(wC)
defer close(wtC)
rl := rate.NewLimiter(
rate.Every(r),
1,
)
// relays input channel's items to two channels:
// (1) gets all writes from original channel
// (2) only writes at a fixed frequency
for ev := range in {
wC <- ev
if rl.Allow() {
wtC <- ev
}
}
}()
return
}
工作示例:https://play.golang.org/p/upei0TiyzNr
编辑:
避免使用速率限制器,而是使用简单的 time.Ticker
:
tick := time.NewTicker(r)
for ev := range in {
select {
case wC <- ev: // write to main
case <-tick.C:
wC <- ev // write to main ...
wtC <- ev // ... plus throttle channel
}
}