异步 FIFO 调节器
Async FIFO throttler
我在使用节流器发送电报时遇到了一些问题 api。
问题基本上是,如果请求数超过我的节流器限制,当最小值过去时,消息会随机发送。
这是我正在使用的节流阀的代码(在某些 github 上找到)
class Throttler:
def __init__(self, rate_limit, period=1.0, retry_interval=0.01):
self.rate_limit = rate_limit
self.period = period
self.retry_interval = retry_interval
self._task_logs = deque()
def flush(self):
now = time.time()
while self._task_logs:
if now - self._task_logs[0] > self.period:
self._task_logs.popleft()
else:
break
async def acquire(self):
while True:
self.flush()
if len(self._task_logs) < self.rate_limit:
break
await asyncio.sleep(self.retry_interval)
self._task_logs.append(time.time())
async def __aenter__(self):
await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
pass
I can use this as following
throttler = Throttler(rate_limit=30, period=10)
async with throttler:
await sendmessage(message)
发现解决这个问题的最佳方法是对节流阀使用不同的算法。
我在上面使用的节流器总是随机传递消息,因为在初始突发之后,消息会卡在队列中,当时间过去时,asyncio 会立即释放所有消息。
我发现最好的解决方法是使用所谓的 LeakyBucket 算法。我使用以下答案自己实现了 LeakyBucket
我在使用节流器发送电报时遇到了一些问题 api。
问题基本上是,如果请求数超过我的节流器限制,当最小值过去时,消息会随机发送。
这是我正在使用的节流阀的代码(在某些 github 上找到)
class Throttler:
def __init__(self, rate_limit, period=1.0, retry_interval=0.01):
self.rate_limit = rate_limit
self.period = period
self.retry_interval = retry_interval
self._task_logs = deque()
def flush(self):
now = time.time()
while self._task_logs:
if now - self._task_logs[0] > self.period:
self._task_logs.popleft()
else:
break
async def acquire(self):
while True:
self.flush()
if len(self._task_logs) < self.rate_limit:
break
await asyncio.sleep(self.retry_interval)
self._task_logs.append(time.time())
async def __aenter__(self):
await self.acquire()
async def __aexit__(self, exc_type, exc, tb):
pass
I can use this as following
throttler = Throttler(rate_limit=30, period=10)
async with throttler:
await sendmessage(message)
发现解决这个问题的最佳方法是对节流阀使用不同的算法。
我在上面使用的节流器总是随机传递消息,因为在初始突发之后,消息会卡在队列中,当时间过去时,asyncio 会立即释放所有消息。
我发现最好的解决方法是使用所谓的 LeakyBucket 算法。我使用以下答案自己实现了 LeakyBucket