对源自循环的多个线程进行排队的最安全方法是什么?
What is the safest way to queue multiple threads originating in a loop?
我的脚本遍历输入文件的每一行,并使用每一行中的字符串执行一些操作。由于每行执行的任务彼此独立,我决定将任务分成 threads
这样脚本就不必等待任务完成才能继续循环。代码如下。
def myFunction(line, param):
# Doing something with line and param
# Sends multiple HTTP requests and parse the response and produce outputs
# Returns nothing
param = arg[1]
with open(targets, "r") as listfile:
for line in listfile:
print("Starting a thread for: ",line)
t=threading.Thread(target=myFunction, args=(line, param,))
threads.append(t)
t.start()
我意识到这是个坏主意,因为输入文件中的行数越来越大。使用此代码,线程数将与行数一样多。研究了一下,认为 queues
会是这样。
我想了解在这种情况下使用队列的最佳方式,以及是否有任何我可以使用的替代方法。
要解决这个问题,您可以使用线程池的概念,您可以在其中定义要使用的固定数量的 Threads/workers,例如 5 个工作线程,并且每当一个线程完成执行时,另一个Future(ly) submmited 线程将自动取代它。
示例:
import concurrent.futures
def myFunction(line, param):
print("Done with :", line, param)
param = "param_example"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
with open("targets", "r") as listfile:
for line in listfile:
print("Starting a thread for: ", line)
futures.append(executor.submit(myFunction, line=line, param=param))
# waiting for the threads to finish and maybe print a result :
for future in concurrent.futures.as_completed(futures):
print(future.result()) # an Exceptino should be handled here!!!
队列是一种方法。使用方法是将函数参数放在一个队列中,使用线程获取并进行处理。
在这种情况下,队列大小无关紧要,因为读取下一行很快。在另一种情况下,更优化的解决方案是将队列大小设置为至少两倍于线程数。这样,如果所有线程同时完成处理队列中的一个项目,它们都将准备好处理队列中的下一个项目。
为避免使代码线程复杂化,可以将线程设置为守护进程,这样它们就不会在处理完成后阻止程序完成。它们将在主进程完成时终止。
另一种方法是为每个线程在队列中放置一个特殊项目(如None
),并在从队列中获取它后让线程退出,然后加入线程。
对于下面的示例,工作线程数是使用 workers
变量设置的。
这是一个使用队列的解决方案示例。
from queue import Queue
from threading import Thread
queue = Queue(workers * 2)
def work():
while True:
myFunction(*queue.get())
queue.task_done()
for _ in range(workers):
Thread(target=work, daemon=True).start()
with open(targets, 'r') as listfile:
for line in listfile:
queue.put((line, param))
queue.join()
一个更简单的解决方案可能是使用 ThreadPoolExecutor。在这种情况下特别简单,因为被调用的函数没有return任何需要在主线程中使用的东西。
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as executor:
with open(targets, 'r') as listfile:
for line in listfile:
executor.submit(myFunction, line, param)
此外,如果将所有行都存储在内存中不是问题,则有一种解决方案不使用线程以外的任何东西。工作以线程从列表中读取某些行并忽略其他行的方式进行拆分。有两个线程的一个简单示例是一个线程读取奇数行,另一个线程读取偶数行。
from threading import Thread
with open(targets, 'r') as listfile:
lines = listfile.readlines()
def work_split(n):
for line in lines[n::workers]:
myFunction(line, param)
threads = []
for n in range(workers):
t = Thread(target=work_split, args=(n,))
t.start()
threads.append(t)
for t in threads:
t.join()
我做了一个快速基准测试,Queue
比 ThreadPoolExecutor
稍快,但拆分工作的解决方案比两者都快。
从您报告的代码来看,线程的使用毫无意义。
这是因为没有任何 I/O 操作,因此线程在没有多线程的情况下以线性方式执行。 在这种情况下,GIL(全局解释器锁)永远不会被线程释放,所以应用程序只是表面上使用多线程,实际上解释器只使用一个 CPU一次程序和一个线程。
通过这种方式,您在使用线程方面没有任何优势,相反,由于切换上下文以及线程启动时的线程初始化开销,您可能会在这种情况下降低性能。
如果在这种情况下适用,在这种情况下获得更好性能的唯一方法是多进程程序。但是要注意启动的进程数,记住每个进程都有自己的解释器。
GitFront 的回答很好。这个答案只是使用 multiprocessing 包增加了一个选项。
使用 concurrent.futures 或多处理取决于特定要求。 Multiprocessing 相对来说有更多的选项,但对于给定的问题,在最简单的情况下,结果应该几乎相同。
from multiprocessing import cpu_count, Pool
PROCESSES = cpu_count() # Warning: uses all cores
def pool_method(listfile, param):
p = Pool(processes=PROCESSES)
checker = [p.apply_async(myFunction, (line, param)) for line in listfile]
...
除了“apply_async”之外,还有其他各种方法,但这应该能很好地满足您的需要。
我的脚本遍历输入文件的每一行,并使用每一行中的字符串执行一些操作。由于每行执行的任务彼此独立,我决定将任务分成 threads
这样脚本就不必等待任务完成才能继续循环。代码如下。
def myFunction(line, param):
# Doing something with line and param
# Sends multiple HTTP requests and parse the response and produce outputs
# Returns nothing
param = arg[1]
with open(targets, "r") as listfile:
for line in listfile:
print("Starting a thread for: ",line)
t=threading.Thread(target=myFunction, args=(line, param,))
threads.append(t)
t.start()
我意识到这是个坏主意,因为输入文件中的行数越来越大。使用此代码,线程数将与行数一样多。研究了一下,认为 queues
会是这样。
我想了解在这种情况下使用队列的最佳方式,以及是否有任何我可以使用的替代方法。
要解决这个问题,您可以使用线程池的概念,您可以在其中定义要使用的固定数量的 Threads/workers,例如 5 个工作线程,并且每当一个线程完成执行时,另一个Future(ly) submmited 线程将自动取代它。
示例:
import concurrent.futures
def myFunction(line, param):
print("Done with :", line, param)
param = "param_example"
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = []
with open("targets", "r") as listfile:
for line in listfile:
print("Starting a thread for: ", line)
futures.append(executor.submit(myFunction, line=line, param=param))
# waiting for the threads to finish and maybe print a result :
for future in concurrent.futures.as_completed(futures):
print(future.result()) # an Exceptino should be handled here!!!
队列是一种方法。使用方法是将函数参数放在一个队列中,使用线程获取并进行处理。
在这种情况下,队列大小无关紧要,因为读取下一行很快。在另一种情况下,更优化的解决方案是将队列大小设置为至少两倍于线程数。这样,如果所有线程同时完成处理队列中的一个项目,它们都将准备好处理队列中的下一个项目。
为避免使代码线程复杂化,可以将线程设置为守护进程,这样它们就不会在处理完成后阻止程序完成。它们将在主进程完成时终止。
另一种方法是为每个线程在队列中放置一个特殊项目(如None
),并在从队列中获取它后让线程退出,然后加入线程。
对于下面的示例,工作线程数是使用 workers
变量设置的。
这是一个使用队列的解决方案示例。
from queue import Queue
from threading import Thread
queue = Queue(workers * 2)
def work():
while True:
myFunction(*queue.get())
queue.task_done()
for _ in range(workers):
Thread(target=work, daemon=True).start()
with open(targets, 'r') as listfile:
for line in listfile:
queue.put((line, param))
queue.join()
一个更简单的解决方案可能是使用 ThreadPoolExecutor。在这种情况下特别简单,因为被调用的函数没有return任何需要在主线程中使用的东西。
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=workers) as executor:
with open(targets, 'r') as listfile:
for line in listfile:
executor.submit(myFunction, line, param)
此外,如果将所有行都存储在内存中不是问题,则有一种解决方案不使用线程以外的任何东西。工作以线程从列表中读取某些行并忽略其他行的方式进行拆分。有两个线程的一个简单示例是一个线程读取奇数行,另一个线程读取偶数行。
from threading import Thread
with open(targets, 'r') as listfile:
lines = listfile.readlines()
def work_split(n):
for line in lines[n::workers]:
myFunction(line, param)
threads = []
for n in range(workers):
t = Thread(target=work_split, args=(n,))
t.start()
threads.append(t)
for t in threads:
t.join()
我做了一个快速基准测试,Queue
比 ThreadPoolExecutor
稍快,但拆分工作的解决方案比两者都快。
从您报告的代码来看,线程的使用毫无意义。 这是因为没有任何 I/O 操作,因此线程在没有多线程的情况下以线性方式执行。 在这种情况下,GIL(全局解释器锁)永远不会被线程释放,所以应用程序只是表面上使用多线程,实际上解释器只使用一个 CPU一次程序和一个线程。 通过这种方式,您在使用线程方面没有任何优势,相反,由于切换上下文以及线程启动时的线程初始化开销,您可能会在这种情况下降低性能。
如果在这种情况下适用,在这种情况下获得更好性能的唯一方法是多进程程序。但是要注意启动的进程数,记住每个进程都有自己的解释器。
GitFront 的回答很好。这个答案只是使用 multiprocessing 包增加了一个选项。 使用 concurrent.futures 或多处理取决于特定要求。 Multiprocessing 相对来说有更多的选项,但对于给定的问题,在最简单的情况下,结果应该几乎相同。
from multiprocessing import cpu_count, Pool
PROCESSES = cpu_count() # Warning: uses all cores
def pool_method(listfile, param):
p = Pool(processes=PROCESSES)
checker = [p.apply_async(myFunction, (line, param)) for line in listfile]
...
除了“apply_async”之外,还有其他各种方法,但这应该能很好地满足您的需要。