python 使用共享队列和线程的线程 类
python threading with a shared queue and thread classes
我一直在努力寻找一个看起来像我的实现,但我似乎找不到。
细节:我检索了一些数据库记录并希望在最多 5 个线程中处理所有这些记录。但我希望这些线程报告任何潜在的错误,然后关闭各个线程(或记录它们)。所以我想将所有记录推送到队列中并让线程从队列中获取。
到目前为止我有这个。
class DatabaseRecordImporterThread(threading.Thread):
def __init__(self, record_queue):
super(DatabaseRecordImporterThread, self).__init__()
self.record_queue = record_queue
def run(self):
try:
record = self.record_queue.get()
force_key_error(record)
except Exception as e:
print("Thread failed: ", e) # I want this to print to the main thread stdout
logger.log(e) # I want this to log to a shared log file (with appending)
MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query) # unimportant
# this is where i put all records on a queue
for record in database_records_retrieved:
jobs.put(record)
for _ in range(MAX_THREAD_COUNT):
worker = DatabaseRecordImporterThread(jobs)
worker.start()
workers.append(worker)
print('*** Main thread waiting')
jobs.join()
print('*** Done')
所以想法是每个线程都获取 jobs
队列,并且它们正在从中检索记录并打印。由于要处理的数量不是预先指定的(定义为一次处理 k 条记录或其他),每个线程将尝试只处理队列中的任何内容。但是,当我强制出错时,输出看起来像这样。
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
*** Main thread waiting
当没有错误报告时,线程每个只读取一条记录:
(record)
(record)
(record)
(record)
(record)
*** Main thread waiting
在正常的线程设置中,我知道您可以通过执行类似这样的操作来设置队列
Thread(target=function, args=(parameters, queue)
但是当您使用继承 Thread 对象的 class 时,如何正确设置它?我似乎无法弄清楚。我的假设之一是队列对象并不浅,因此创建的每个新对象实际上都引用内存中的同一个队列 - 这是真的吗?
线程挂起,显然,因为它们不是(?)守护线程。不仅如此,似乎线程每个只读取一条记录,然后做同样的事情。有些事情我想做却不知道怎么做。
- 如果所有线程都失败,主线程应该继续并说“*** 完成”。
- 线程应继续处理队列,直到它为空
为了执行 (2),我可能需要在主线程中使用 while !queue.empty
之类的东西,但是我如何确保将线程限制为最多只有 5 个线程?
我想出了问题的答案。经过大量研究和一些代码阅读后,需要发生的事情如下
不应检查队列是否为空,因为它存在竞争条件。相反,工作人员应该在无限循环下继续并尝试继续从 Queue
中检索
每当队列任务完成时,需要调用queue.task_done()
方法来提醒MainThreadjoin()
方法。发生的情况是 task_done
调用的数量将与入队调用的数量同步,一旦队列为空,线程将正式加入。
对固定数据大小的任务使用队列在某种程度上不是最佳选择。与其创建一个每个线程都从中读取的队列,不如简单地将数据分成大小相等的块,让线程只 运行 处理列表子集会更好。这样我们就不会在等待添加新元素时被 queue.get()
阻塞。比如,while True: if not queue.empty(): do_something()
如果我们想继续过去,异常处理仍然应该调用 task_done()
。根据是否捕获到异常来决定整个线程是否应该失败是一种设计选择,但如果是这种情况,那么该元素仍应标记为已处理。
我一直在努力寻找一个看起来像我的实现,但我似乎找不到。
细节:我检索了一些数据库记录并希望在最多 5 个线程中处理所有这些记录。但我希望这些线程报告任何潜在的错误,然后关闭各个线程(或记录它们)。所以我想将所有记录推送到队列中并让线程从队列中获取。
到目前为止我有这个。
class DatabaseRecordImporterThread(threading.Thread):
def __init__(self, record_queue):
super(DatabaseRecordImporterThread, self).__init__()
self.record_queue = record_queue
def run(self):
try:
record = self.record_queue.get()
force_key_error(record)
except Exception as e:
print("Thread failed: ", e) # I want this to print to the main thread stdout
logger.log(e) # I want this to log to a shared log file (with appending)
MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query) # unimportant
# this is where i put all records on a queue
for record in database_records_retrieved:
jobs.put(record)
for _ in range(MAX_THREAD_COUNT):
worker = DatabaseRecordImporterThread(jobs)
worker.start()
workers.append(worker)
print('*** Main thread waiting')
jobs.join()
print('*** Done')
所以想法是每个线程都获取 jobs
队列,并且它们正在从中检索记录并打印。由于要处理的数量不是预先指定的(定义为一次处理 k 条记录或其他),每个线程将尝试只处理队列中的任何内容。但是,当我强制出错时,输出看起来像这样。
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
*** Main thread waiting
当没有错误报告时,线程每个只读取一条记录:
(record)
(record)
(record)
(record)
(record)
*** Main thread waiting
在正常的线程设置中,我知道您可以通过执行类似这样的操作来设置队列
Thread(target=function, args=(parameters, queue)
但是当您使用继承 Thread 对象的 class 时,如何正确设置它?我似乎无法弄清楚。我的假设之一是队列对象并不浅,因此创建的每个新对象实际上都引用内存中的同一个队列 - 这是真的吗?
线程挂起,显然,因为它们不是(?)守护线程。不仅如此,似乎线程每个只读取一条记录,然后做同样的事情。有些事情我想做却不知道怎么做。
- 如果所有线程都失败,主线程应该继续并说“*** 完成”。
- 线程应继续处理队列,直到它为空
为了执行 (2),我可能需要在主线程中使用 while !queue.empty
之类的东西,但是我如何确保将线程限制为最多只有 5 个线程?
我想出了问题的答案。经过大量研究和一些代码阅读后,需要发生的事情如下
不应检查队列是否为空,因为它存在竞争条件。相反,工作人员应该在无限循环下继续并尝试继续从 Queue
中检索
每当队列任务完成时,需要调用
queue.task_done()
方法来提醒MainThreadjoin()
方法。发生的情况是task_done
调用的数量将与入队调用的数量同步,一旦队列为空,线程将正式加入。对固定数据大小的任务使用队列在某种程度上不是最佳选择。与其创建一个每个线程都从中读取的队列,不如简单地将数据分成大小相等的块,让线程只 运行 处理列表子集会更好。这样我们就不会在等待添加新元素时被
queue.get()
阻塞。比如,while True: if not queue.empty(): do_something()
如果我们想继续过去,异常处理仍然应该调用
task_done()
。根据是否捕获到异常来决定整个线程是否应该失败是一种设计选择,但如果是这种情况,那么该元素仍应标记为已处理。