Python队列:如何延迟/延长/修改阻塞get的超时时间?

Python Queue: how to delay / prolong / modify timeout for blocking get?

我有一个由线程填充的 queue.Queue。一个方法尝试从这个队列接收超时。现在让我们说,另一个线程可以重置我们队列等待的超时,如果队列没有及时送达,我们的接收函数应该继续,并更新超时。 我可以如下实现,但是我必须修改内置 queue.Queue class,以便 get() 方法中的 endtime 参数可以在等待期间修改... 有没有更好的解决方案? (我不想使用 asyncio...)

from threading import Thread
from queue import Queue, Empty
import time

q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()

def receive():
    try:
        _res = q.get(block=True, timeout=TIMEOUT)
        print(f'get @ {time.time()-t0}')
        return _res
    except Empty:
        print(f'to @ {time.time()-t0}')
        return None

def feed_queue():
    time.sleep(PUT_TIME)
    print(f'put @ {time.time()-t0}')
    q.put_nowait(42)

def reset_timeout():
    time.sleep(RESET_TIME)
    with q.mutex:
        q.endtime += TIMEOUT
    print(f'reset @ {time.time()-t0}')

if __name__ == '__main__':
    Thread(target=feed_queue).start()
    Thread(target=reset_timeout).start()
    res = receive()
    print('res:', res)

这产生:

reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42

已在 queue.py 中进行以下修改以使其生效:

Index: queue.py
===================================================================
--- queue.py    (revision 28725)
+++ queue.py    (working copy)
@@ -52,6 +52,7 @@
         # drops to zero; thread waiting to join() is notified to resume
         self.all_tasks_done = threading.Condition(self.mutex)
         self.unfinished_tasks = 0
+        self.endtime = 0

     def task_done(self):
         '''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
             elif timeout < 0:
                 raise ValueError("'timeout' must be a non-negative number")
             else:
-                endtime = time() + timeout
+                self.endtime = time() + timeout
                 while not self._qsize():
-                    remaining = endtime - time()
+                    remaining = self.endtime - time()
                     if remaining <= 0.0:
                         raise Empty
                     self.not_empty.wait(remaining)

您可以创建自己的 class,继承 Queue 并添加全局变量 endtime,如下所示:

class Waszil(Queue):
    def __init__(self, maxsize=0):
        super().__init__(self)
        self.maxsize = maxsize
        self._init(maxsize)
        self.endtime = 0

然后只需将 q = Queue() 更改为 q = Waszil() 就可以了。

编辑: 如果您更喜欢在 Waszil class 中实现固有的线程安全,您可以像这样使用 threading.Lock

from threading import Lock

class Waszil(Queue):
    def __init__(self, maxsize=0):
        super().__init__(self)
        self.threadLock = Lock()
        self.maxsize = maxsize
        self._init(maxsize)
        self.endtime = 0

    def increment_endtime(self):
        with self.threadLock:
            self.endtime += 1

在这种情况下,而不是你的

with q.mutex:
    q.endtime += TIMEOUT    

你只需调用 q.increment_endtime()