Python队列:如何延迟/延长/修改阻塞get的超时时间?
Python Queue: how to delay / prolong / modify timeout for blocking get?
我有一个由线程填充的 queue.Queue
。一个方法尝试从这个队列接收超时。现在让我们说,另一个线程可以重置我们队列等待的超时,如果队列没有及时送达,我们的接收函数应该继续,并更新超时。
我可以如下实现,但是我必须修改内置 queue.Queue
class,以便 get()
方法中的 endtime
参数可以在等待期间修改...
有没有更好的解决方案? (我不想使用 asyncio...)
from threading import Thread
from queue import Queue, Empty
import time
q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()
def receive():
try:
_res = q.get(block=True, timeout=TIMEOUT)
print(f'get @ {time.time()-t0}')
return _res
except Empty:
print(f'to @ {time.time()-t0}')
return None
def feed_queue():
time.sleep(PUT_TIME)
print(f'put @ {time.time()-t0}')
q.put_nowait(42)
def reset_timeout():
time.sleep(RESET_TIME)
with q.mutex:
q.endtime += TIMEOUT
print(f'reset @ {time.time()-t0}')
if __name__ == '__main__':
Thread(target=feed_queue).start()
Thread(target=reset_timeout).start()
res = receive()
print('res:', res)
这产生:
reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42
已在 queue.py 中进行以下修改以使其生效:
Index: queue.py
===================================================================
--- queue.py (revision 28725)
+++ queue.py (working copy)
@@ -52,6 +52,7 @@
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
+ self.endtime = 0
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
- endtime = time() + timeout
+ self.endtime = time() + timeout
while not self._qsize():
- remaining = endtime - time()
+ remaining = self.endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
您可以创建自己的 class,继承 Queue
并添加全局变量 endtime
,如下所示:
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
然后只需将 q = Queue()
更改为 q = Waszil()
就可以了。
编辑:
如果您更喜欢在 Waszil class 中实现固有的线程安全,您可以像这样使用 threading.Lock
:
from threading import Lock
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.threadLock = Lock()
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
def increment_endtime(self):
with self.threadLock:
self.endtime += 1
在这种情况下,而不是你的
with q.mutex:
q.endtime += TIMEOUT
你只需调用 q.increment_endtime()
我有一个由线程填充的 queue.Queue
。一个方法尝试从这个队列接收超时。现在让我们说,另一个线程可以重置我们队列等待的超时,如果队列没有及时送达,我们的接收函数应该继续,并更新超时。
我可以如下实现,但是我必须修改内置 queue.Queue
class,以便 get()
方法中的 endtime
参数可以在等待期间修改...
有没有更好的解决方案? (我不想使用 asyncio...)
from threading import Thread
from queue import Queue, Empty
import time
q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()
def receive():
try:
_res = q.get(block=True, timeout=TIMEOUT)
print(f'get @ {time.time()-t0}')
return _res
except Empty:
print(f'to @ {time.time()-t0}')
return None
def feed_queue():
time.sleep(PUT_TIME)
print(f'put @ {time.time()-t0}')
q.put_nowait(42)
def reset_timeout():
time.sleep(RESET_TIME)
with q.mutex:
q.endtime += TIMEOUT
print(f'reset @ {time.time()-t0}')
if __name__ == '__main__':
Thread(target=feed_queue).start()
Thread(target=reset_timeout).start()
res = receive()
print('res:', res)
这产生:
reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42
已在 queue.py 中进行以下修改以使其生效:
Index: queue.py
===================================================================
--- queue.py (revision 28725)
+++ queue.py (working copy)
@@ -52,6 +52,7 @@
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
+ self.endtime = 0
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
- endtime = time() + timeout
+ self.endtime = time() + timeout
while not self._qsize():
- remaining = endtime - time()
+ remaining = self.endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
您可以创建自己的 class,继承 Queue
并添加全局变量 endtime
,如下所示:
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
然后只需将 q = Queue()
更改为 q = Waszil()
就可以了。
编辑:
如果您更喜欢在 Waszil class 中实现固有的线程安全,您可以像这样使用 threading.Lock
:
from threading import Lock
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.threadLock = Lock()
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
def increment_endtime(self):
with self.threadLock:
self.endtime += 1
在这种情况下,而不是你的
with q.mutex:
q.endtime += TIMEOUT
你只需调用 q.increment_endtime()