异步 FIFO 调节器

Async FIFO throttler

我在使用节流器发送电报时遇到了一些问题 api。

问题基本上是,如果请求数超过我的节流器限制,当最小值过去时,消息会随机发送。

这是我正在使用的节流阀的代码(在某些 github 上找到)

class Throttler:
    def __init__(self, rate_limit, period=1.0, retry_interval=0.01):
        self.rate_limit = rate_limit
        self.period = period
        self.retry_interval = retry_interval

        self._task_logs = deque()

    def flush(self):
        now = time.time()
        while self._task_logs:
            if now - self._task_logs[0] > self.period:
                self._task_logs.popleft()
            else:
                break


    async def acquire(self):
        while True:
            self.flush()
            if len(self._task_logs) < self.rate_limit:
                break
            await asyncio.sleep(self.retry_interval)

        self._task_logs.append(time.time())

    async def __aenter__(self):
        await self.acquire()


    async def __aexit__(self, exc_type, exc, tb):
        pass
I can use this as following
throttler = Throttler(rate_limit=30, period=10)

async with throttler:
    await sendmessage(message)

发现解决这个问题的最佳方法是对节流阀使用不同的算法。

我在上面使用的节流器总是随机传递消息,因为在初始突发之后,消息会卡在队列中,当时间过去时,asyncio 会立即释放所有消息。

我发现最好的解决方法是使用所谓的 LeakyBucket 算法。我使用以下答案自己实现了 LeakyBucket