.join 用于 python 中可能的空队列

.join for possible empty Queue in python

我正在尝试在 python 中实现一个 Queue 处理线程,如下所示:

from queue import Queue
from threading import Thread
import sys    

class Worker(Thread):
    def __init__(self, queue):
        # Call thread constructor
        self.queue = queue

    def run(self):
        while True:
            task = self.queue.get()
            # doTask()
            self.queue.task_done()

queue = Queue()
thread = Worker(thread)
thread.start()

while True:
    inp = user_input()

    if condition(inp):
        queue.put(sometask())
    else:
        queue.join()
        thread.join()
        sys.exit(0)

在此示例中,假设用户决定 exit 而不向队列添加任何项目。然后我的线程将阻塞在 self.queue.get 并且我 queue.join() 将无法工作。因此,我无法执行正确的 exit.

我该如何处理这个问题?

您可以给 Queue.get 一个超时并使用停止事件:

from Queue import Queue, Empty
from threading import Thread, Event
import sys

class Worker(Thread):
    def __init__(self, queue, stop):
        # Call thread constructor
        self.queue = queue
        self.stop = stop
        super(Worker, self).__init__()

    def run(self):
        while not self.stop.is_set():
            try:
                task = self.queue.get(timeout=1)
            except Empty:
                continue
            # doTask()
            self.queue.task_done()

queue = Queue()
stop = Event()
thread = Worker(queue, stop)
thread.start()

while True:
    inp = raw_input()

    if inp:
        queue.put(inp)
    else:
        stop.set()
        queue.join()
        thread.join()
        sys.exit(0)

这为 Thread worker 的 while 循环添加了一个条件,以便您可以随时停止它。您必须给 Queue.get 一个超时,以便它可以定期检查停止事件。

更新

您可以使用哨兵而不是超时:

from Queue import Queue
from threading import Thread
import sys

_sentinel = Object()

class Worker(Thread):
    def __init__(self, queue, sentinel=None):
        # Call thread constructor
        self.queue = queue
        self.sentinel = sentinel
        super(Worker, self).__init__()

    def run(self):
        while True:
            task = self.queue.get()
            if task is self.sentinel:
                self.queue.task_done()
                return
            # doTask()
            self.queue.task_done()


queue = Queue()
thread = Worker(queue, sentinel=_sentinel)
thread.start()

while True:
    inp = raw_input()

    if inp:
        queue.put(inp)
    else:
        queue.put(_sentinel)
        queue.join()
        thread.join()
        sys.exit(0)

感谢 Bakuriu 的 sentinel = Object() 建议。