限制来自通道的已处理消息数

Limit the number of processed messages from channel

我通过渠道收到大约 200 000 message/seconds 给我的工作人员,我需要将发送给客户端的消息数量限制为每秒 20 条。 这使得它每 50 毫秒 1 条消息

并且在 LOOP 的帮助下,worker 在整个程序生命周期内仍然存活(并且不会为每条消息打开一个通道)。

我的目标: - 由于消息的顺序很重要,我想跳过被阻塞的 50 毫秒内出现的所有消息,只保存最新的一条 - 如果最新消息在阻塞的 50 毫秒内出现,我希望在循环内的阻塞时间结束并且没有新消息到来时处理保存的消息! <-- 这是我的问题

我的攻略 - 继续发送最新的尚未处理的消息到同一个频道

但它的问题是,如果该消息是在(来自应用程序的)新消息之后发送的怎么办?

下面的代码更像是一个算法作为工作代码,只需要 tip/way 如何实现。

func example (new_message_from_channel <-chan *message) {
    default = message
    time = now_milliseconds
    diff_accepted = 50milli
    for this_message := range new_message_from_channel {
        if now_millisecond -  time >= diff_accepted {
            send_it_to_the_client
            time = now_milliseconds
        } else {
            //save the latest message
            default = this_message

            //My problem is how to process this latest message when the blocked 50ms is over and no new message coming ?!

            //My strategy - keep sending it to the same channel
            theChannel <- default
        }

    }
}

如果你有好的方法,欢迎分享给我:)

使用速率限制器,您可以创建一个 throttle 函数,它将:一个速率和一个通道作为输入;和 return 两个频道 - 一个包含所有原始频道项目,另一个仅以固定速率中继项目:

func throttle(r time.Duration, in <-chan event) (C, tC <-chan event) {

    // "writeable" channels
    var (
        wC  = make(chan event)
        wtC = make(chan event)
    )

    // read-only channels - returned to caller
    C = wC
    tC = wtC

    go func() {
        defer close(wC)
        defer close(wtC)

        rl := rate.NewLimiter(
            rate.Every(r),
            1,
        )

        // relays input channel's items to two channels:
        // (1) gets all writes from original channel
        // (2) only writes at a fixed frequency
        for ev := range in {
            wC <- ev
            if rl.Allow() {
                wtC <- ev
            }
        }
    }()
    return
}

工作示例:https://play.golang.org/p/upei0TiyzNr


编辑:

避免使用速率限制器,而是使用简单的 time.Ticker:

tick := time.NewTicker(r)

for ev := range in {
    select {
    case wC <- ev: // write to main
    case <-tick.C:
        wC <- ev  // write to main ...
        wtC <- ev // ... plus throttle channel
    }
}

工作示例:https://play.golang.org/p/UTRXh72BvRl