python 使用共享队列和线程的线程 类

python threading with a shared queue and thread classes

我一直在努力寻找一个看起来像我的实现,但我似乎找不到。

细节:我检索了一些数据库记录并希望在最多 5 个线程中处理所有这些记录。但我希望这些线程报告任何潜在的错误,然后关闭各个线程(或记录它们)。所以我想将所有记录推送到队列中并让线程从队列中获取。

到目前为止我有这个。

class DatabaseRecordImporterThread(threading.Thread):
    def __init__(self, record_queue):
        super(DatabaseRecordImporterThread, self).__init__()
        self.record_queue = record_queue

    def run(self):
        try:
            record = self.record_queue.get()
            force_key_error(record)
        except Exception as e:
            print("Thread failed: ", e)  # I want this to print to the main thread stdout
            logger.log(e)  # I want this to log to a shared log file (with appending)



MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query)  # unimportant

# this is where i put all records on a queue
for record in database_records_retrieved:
    jobs.put(record)

for _ in range(MAX_THREAD_COUNT):
    worker = DatabaseRecordImporterThread(jobs)
    worker.start()
    workers.append(worker)

print('*** Main thread waiting')
jobs.join()
print('*** Done')

所以想法是每个线程都获取 jobs 队列,并且它们正在从中检索记录并打印。由于要处理的数量不是预先指定的(定义为一次处理 k 条记录或其他),每个线程将尝试只处理队列中的任何内容。但是,当我强制出错时,输出看起来像这样。

Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
*** Main thread waiting

当没有错误报告时,线程每个只读取一条记录:

(record)
(record)
(record)
(record)
(record)
*** Main thread waiting

在正常的线程设置中,我知道您可以通过执行类似这样的操作来设置队列

Thread(target=function, args=(parameters, queue)

但是当您使用继承 Thread 对象的 class 时,如何正确设置它?我似乎无法弄清楚。我的假设之一是队列对象并不浅,因此创建的每个新对象实际上都引用内存中的同一个队列 - 这是真的吗?

线程挂起,显然,因为它们不是(?)守护线程。不仅如此,似乎线程每个只读取一条记录,然后做同样的事情。有些事情我想做却不知道怎么做。

  1. 如果所有线程都失败,主线程应该继续并说“*** 完成”。
  2. 线程应继续处理队列,直到它为空

为了执行 (2),我可能需要在主线程中使用 while !queue.empty 之类的东西,但是我如何确保将线程限制为最多只有 5 个线程?

我想出了问题的答案。经过大量研究和一些代码阅读后,需要发生的事情如下

  1. 不应检查队列是否为空,因为它存在竞争条件。相反,工作人员应该在无限循环下继续并尝试继续从 Queue

  2. 中检索
  3. 每当队列任务完成时,需要调用queue.task_done()方法来提醒MainThreadjoin()方法。发生的情况是 task_done 调用的数量将与入队调用的数量同步,一旦队列为空,线程将正式加入。

  4. 对固定数据大小的任务使用队列在某种程度上不是最佳选择。与其创建一个每个线程都从中读取的队列,不如简单地将数据分成大小相等的块,让线程只 运行 处理列表子集会更好。这样我们就不会在等待添加新元素时被 queue.get() 阻塞。比如,while True: if not queue.empty(): do_something()

  5. 如果我们想继续过去,异常处理仍然应该调用 task_done()。根据是否捕获到异常来决定整个线程是否应该失败是一种设计选择,但如果是这种情况,那么该元素仍应标记为已处理。