threading.Timer 的替代方案?
Alternative of threading.Timer?
我有一个生产者-消费者模式队列,它消耗传入事件并安排符合条件的事件在 5 秒内发送出去。我正在使用 threading.Timer()
python document 进行操作,一切正常。
最近,我被要求将预定时间从 5 秒更改为 30 分钟,threading.Timer()
使我的脚本崩溃,因为之前线程对象的创建和释放很快(仅最后 5 秒)但是现在它必须存活 30 分钟。
代码如下:
if scheduled_time and out_event:
threading.Timer(scheduled_time, self.send_out_event, (socket_connection, received_event, out_event,)).start() # schedule event send out
有人可以解释一下吗?我该如何解决这个问题,或者 threading.Timer()
有什么替代方案吗?
感谢@dano 对第 3 方模块的评论!基于我的工作需要,我没有在服务器上安装它们。
我没有使用 threading.Timer()
,而是选择使用基于 Redis 的延迟队列,我在网上找到了一些有用的资源:A unique Python redis-based queue with delay。它解决了我的问题。
简而言之,作者在redis中创建了一个排序集并为其命名,add()
会将新数据追加到排序集中。每次根据epoc-time score从sorted set中弹出最多一个个元素时,pop out符合条件的最小score的元素(不从中移除redis)
def add(self, received_event, delay_queue_name="delay_queue", delay=config.SECOND_RETRY_DELAY):
try:
score = int(time.time()) + delay
self.__client.zadd(delay_queue_name, score, received_event)
self.__logger.debug("added {0} to delay queue, delay time:{1}".format(received_event, delay))
except Exception as e:
self.__logger.error("error: {0}".format(e))
def pop(self, delay_queue_name="delay_queue"):
min_score, max_score, element = 0, int(time.time()), None
try:
result = self.__client.zrangebyscore(delay_queue_name, min_score, max_score, start=0, num=1, withscores=False)
except Exception as e:
self.__logger.error("failed query from redis:{0}".format(e))
return None
if result and len(result) == 1:
element = result[0]
self.__logger.debug("poped {0} from delay queue".format(element))
else:
self.__logger.debug("no qualified element")
return element
def remove(self, element, delay_queue_name="delay_queue"):
self.__client.zrem(delay_queue_name, element)
self.__client
是 Redis 客户端实例,redis.StrictRedis(host=rhost,port=rport, db=rindex)
。
网上源码和我的区别是我切换了zadd()
参数。 score
和 data
的顺序调换了。下面是zadd()
的文档
这是 python redis 文档:
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""
Set any number of score, element-name pairs to the key ``name``. Pairs
can be specified in two ways:
As *args, in the form of: score1, name1, score2, name2, ...
or as **kwargs, in the form of: name1=score1, name2=score2, ...
The following example would add four values to the 'my-key' key:
redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
"""
pieces = []
if args:
if len(args) % 2 != 0:
raise RedisError("ZADD requires an equal number of "
"values and scores")
pieces.extend(args)
for pair in iteritems(kwargs):
pieces.append(pair[1])
pieces.append(pair[0])
return self.execute_command('ZADD', name, *pieces)
我有一个生产者-消费者模式队列,它消耗传入事件并安排符合条件的事件在 5 秒内发送出去。我正在使用 threading.Timer()
python document 进行操作,一切正常。
最近,我被要求将预定时间从 5 秒更改为 30 分钟,threading.Timer()
使我的脚本崩溃,因为之前线程对象的创建和释放很快(仅最后 5 秒)但是现在它必须存活 30 分钟。
代码如下:
if scheduled_time and out_event:
threading.Timer(scheduled_time, self.send_out_event, (socket_connection, received_event, out_event,)).start() # schedule event send out
有人可以解释一下吗?我该如何解决这个问题,或者 threading.Timer()
有什么替代方案吗?
感谢@dano 对第 3 方模块的评论!基于我的工作需要,我没有在服务器上安装它们。
我没有使用 threading.Timer()
,而是选择使用基于 Redis 的延迟队列,我在网上找到了一些有用的资源:A unique Python redis-based queue with delay。它解决了我的问题。
简而言之,作者在redis中创建了一个排序集并为其命名,add()
会将新数据追加到排序集中。每次根据epoc-time score从sorted set中弹出最多一个个元素时,pop out符合条件的最小score的元素(不从中移除redis)
def add(self, received_event, delay_queue_name="delay_queue", delay=config.SECOND_RETRY_DELAY):
try:
score = int(time.time()) + delay
self.__client.zadd(delay_queue_name, score, received_event)
self.__logger.debug("added {0} to delay queue, delay time:{1}".format(received_event, delay))
except Exception as e:
self.__logger.error("error: {0}".format(e))
def pop(self, delay_queue_name="delay_queue"):
min_score, max_score, element = 0, int(time.time()), None
try:
result = self.__client.zrangebyscore(delay_queue_name, min_score, max_score, start=0, num=1, withscores=False)
except Exception as e:
self.__logger.error("failed query from redis:{0}".format(e))
return None
if result and len(result) == 1:
element = result[0]
self.__logger.debug("poped {0} from delay queue".format(element))
else:
self.__logger.debug("no qualified element")
return element
def remove(self, element, delay_queue_name="delay_queue"):
self.__client.zrem(delay_queue_name, element)
self.__client
是 Redis 客户端实例,redis.StrictRedis(host=rhost,port=rport, db=rindex)
。
网上源码和我的区别是我切换了zadd()
参数。 score
和 data
的顺序调换了。下面是zadd()
这是 python redis 文档:
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
"""
Set any number of score, element-name pairs to the key ``name``. Pairs
can be specified in two ways:
As *args, in the form of: score1, name1, score2, name2, ...
or as **kwargs, in the form of: name1=score1, name2=score2, ...
The following example would add four values to the 'my-key' key:
redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
"""
pieces = []
if args:
if len(args) % 2 != 0:
raise RedisError("ZADD requires an equal number of "
"values and scores")
pieces.extend(args)
for pair in iteritems(kwargs):
pieces.append(pair[1])
pieces.append(pair[0])
return self.execute_command('ZADD', name, *pieces)