Python多线程:在主线程中获取工作线程结果
Python multithreading: Obtain worker thread results in main thread
在 Python 3.8 上,我为网络 I/O 任务实现了多线程,其中一组工作线程从网络下载一些数据,处理它并创建它们各自的结果列表.现在,当所有线程完成后,我希望主线程获取所有工作线程的结果列表并进一步处理。
对于本次讨论,我删除了网络 I/O 调用并引入了一些伪代码。这是它的样子:
from queue import Queue
from threading import Thread
from random import randint as ri
class DownloadWorker(Thread):
def __init__(self, queue, result_q):
Thread.__init__(self)
self.queue = queue
self.result_q = result_q
def run(self):
while True:
start_val = self.queue.get()
try:
# dummy code. Real code has network calls here
thread_output = [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
self.result_q.put(thread_output)
finally:
self.queue.task_done()
def main():
queue = Queue() # Communication between main thread and its workers
result_q = Queue() # Result queue so workers results can finally be pooled together by main thread
# Create 2 worker threads
for x in range(2):
worker = DownloadWorker(queue, result_q)
# Setting daemon to True will let the main thread exit even if worker threads block
worker.daemon = True
worker.start()
start_values = [10, 100] # pass start value to differentiate between thread outputs
for start_val in start_values:
queue.put(start_val)
queue.join()
# Both workers tasks done. Now let's pool the results(just printing here for simiplification..)
while not result_q.empty():
print(result_q.get())
if __name__ == '__main__':
main()
这段代码到目前为止运行良好,但我想知道是否有更好的方法在 Python 3.8 中使用多线程在主线程中合并结果。我查看了 this old thread,但是当我根据我的要求更改它时它会抛出错误(坦率地说,我不太了解那里的解决方案)。
在此感谢一些指点!
您发明了自己的线程池,concurrent.futures
模块中的 ThreadPoolExecutor
class 已经提供了线程池:
import concurrent.futures
from random import randint as ri
def worker(start_val):
# dummy code. Real code has network calls here
return [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
def main():
NUMBER_THREADS = 2
with concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS) as executor:
start_values = [10, 100] # pass start value to differentiate between thread outputs
# method submit returns a Future instance, which encapsulates the asynchronous execution of a callable:
futures = [executor.submit(worker, start_val) for start_val in start_values]
for future in futures:
result = future.result() # block until a result is returned
print(result)
# or you can do: results = executor.map(worker, start_values)
if __name__ == '__main__':
main()
打印:
[20, 14, 11]
[104, 104, 108]
在 Python 3.8 上,我为网络 I/O 任务实现了多线程,其中一组工作线程从网络下载一些数据,处理它并创建它们各自的结果列表.现在,当所有线程完成后,我希望主线程获取所有工作线程的结果列表并进一步处理。
对于本次讨论,我删除了网络 I/O 调用并引入了一些伪代码。这是它的样子:
from queue import Queue
from threading import Thread
from random import randint as ri
class DownloadWorker(Thread):
def __init__(self, queue, result_q):
Thread.__init__(self)
self.queue = queue
self.result_q = result_q
def run(self):
while True:
start_val = self.queue.get()
try:
# dummy code. Real code has network calls here
thread_output = [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
self.result_q.put(thread_output)
finally:
self.queue.task_done()
def main():
queue = Queue() # Communication between main thread and its workers
result_q = Queue() # Result queue so workers results can finally be pooled together by main thread
# Create 2 worker threads
for x in range(2):
worker = DownloadWorker(queue, result_q)
# Setting daemon to True will let the main thread exit even if worker threads block
worker.daemon = True
worker.start()
start_values = [10, 100] # pass start value to differentiate between thread outputs
for start_val in start_values:
queue.put(start_val)
queue.join()
# Both workers tasks done. Now let's pool the results(just printing here for simiplification..)
while not result_q.empty():
print(result_q.get())
if __name__ == '__main__':
main()
这段代码到目前为止运行良好,但我想知道是否有更好的方法在 Python 3.8 中使用多线程在主线程中合并结果。我查看了 this old thread,但是当我根据我的要求更改它时它会抛出错误(坦率地说,我不太了解那里的解决方案)。
在此感谢一些指点!
您发明了自己的线程池,concurrent.futures
模块中的 ThreadPoolExecutor
class 已经提供了线程池:
import concurrent.futures
from random import randint as ri
def worker(start_val):
# dummy code. Real code has network calls here
return [ri(0, 10) + start_val, ri(0, 10) + start_val, ri(0, 10) + start_val]
def main():
NUMBER_THREADS = 2
with concurrent.futures.ThreadPoolExecutor(max_workers=NUMBER_THREADS) as executor:
start_values = [10, 100] # pass start value to differentiate between thread outputs
# method submit returns a Future instance, which encapsulates the asynchronous execution of a callable:
futures = [executor.submit(worker, start_val) for start_val in start_values]
for future in futures:
result = future.result() # block until a result is returned
print(result)
# or you can do: results = executor.map(worker, start_values)
if __name__ == '__main__':
main()
打印:
[20, 14, 11]
[104, 104, 108]