对源自循环的多个线程进行排队的最安全方法是什么?

What is the safest way to queue multiple threads originating in a loop?

我的脚本遍历输入文件的每一行,并使用每一行中的字符串执行一些操作。由于每行执行的任务彼此独立,我决定将任务分成 threads 这样脚本就不必等待任务完成才能继续循环。代码如下。


def myFunction(line, param):
    # Doing something with line and param
    # Sends multiple HTTP requests and parse the response and produce outputs
    # Returns nothing

param = arg[1]   
with open(targets, "r") as listfile:
    for line in listfile:
        print("Starting a thread for: ",line)
        t=threading.Thread(target=myFunction, args=(line, param,)) 
        threads.append(t)
        t.start()

我意识到这是个坏主意,因为输入文件中的行数越来越大。使用此代码,线程数将与行数一样多。研究了一下,认为 queues 会是这样。

我想了解在这种情况下使用队列的最佳方式,以及是否有任何我可以使用的替代方法。

要解决这个问题,您可以使用线程池的概念,您可以在其中定义要使用的固定数量的 Threads/workers,例如 5 个工作线程,并且每当一个线程完成执行时,另一个Future(ly) submmited 线程将自动取代它。

示例:

import concurrent.futures

def myFunction(line, param):
    print("Done with :", line, param)

param = "param_example"

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = []
    with open("targets", "r") as listfile:
        for line in listfile:
            print("Starting a thread for: ", line)
            futures.append(executor.submit(myFunction, line=line, param=param))

    # waiting for the threads to finish and maybe print a result :
    for future in concurrent.futures.as_completed(futures):
        print(future.result()) # an Exceptino should be handled here!!!

队列是一种方法。使用方法是将函数参数放在一个队列中,使用线程获取并进行处理。

在这种情况下,队列大小无关紧要,因为读取下一行很快。在另一种情况下,更优化的解决方案是将队列大小设置为至少两倍于线程数。这样,如果所有线程同时完成处理队列中的一个项目,它们都将准备好处理队列中的下一个项目。

为避免使代码线程复杂化,可以将线程设置为守护进程,这样它们就不会在处理完成后阻止程序完成。它们将在主进程完成时终止。

另一种方法是为每个线程在队列中放置一个特殊项目(如None),并在从队列中获取它后让线程退出,然后加入线程。

对于下面的示例,工作线程数是使用 workers 变量设置的。

这是一个使用队列的解决方案示例。

from queue import Queue
from threading import Thread

queue = Queue(workers * 2)
def work():
    while True:
        myFunction(*queue.get())
        queue.task_done()

for _ in range(workers):
    Thread(target=work, daemon=True).start()

with open(targets, 'r') as listfile:
    for line in listfile:
        queue.put((line, param))
queue.join()

一个更简单的解决方案可能是使用 ThreadPoolExecutor。在这种情况下特别简单,因为被调用的函数没有return任何需要在主线程中使用的东西。

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=workers) as executor:
    with open(targets, 'r') as listfile:
        for line in listfile:
            executor.submit(myFunction, line, param)

此外,如果将所有行都存储在内存中不是问题,则有一种解决方案不使用线程以外的任何东西。工作以线程从列表中读取某些行并忽略其他行的方式进行拆分。有两个线程的一个简单示例是一个线程读取奇数行,另一个线程读取偶数行。

from threading import Thread

with open(targets, 'r') as listfile:
    lines = listfile.readlines()

def work_split(n):
    for line in lines[n::workers]:
        myFunction(line, param)

threads = []
for n in range(workers):
    t = Thread(target=work_split, args=(n,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

我做了一个快速基准测试,QueueThreadPoolExecutor 稍快,但拆分工作的解决方案比两者都快。

从您报告的代码来看,线程的使用毫无意义。 这是因为没有任何 I/O 操作,因此线程在没有多线程的情况下以线性方式执行。 在这种情况下,GIL(全局解释器锁)永远不会被线程释放,所以应用程序只是表面上使用多线程,实际上解释器只使用一个 CPU一次程序和一个线程。 通过这种方式,您在使用线程方面没有任何优势,相反,由于切换上下文以及线程启动时的线程初始化开销,您可能会在这种情况下降低性能。

如果在这种情况下适用,在这种情况下获得更好性能的唯一方法是多进程程序。但是要注意启动的进程数,记住每个进程都有自己的解释器。

GitFront 的回答很好。这个答案只是使用 multiprocessing 包增加了一个选项。 使用 concurrent.futures 或多处理取决于特定要求。 Multiprocessing 相对来说有更多的选项,但对于给定的问题,在最简单的情况下,结果应该几乎相同。

from multiprocessing import cpu_count, Pool
PROCESSES = cpu_count() # Warning: uses all cores

def pool_method(listfile, param):
    p = Pool(processes=PROCESSES)
    checker = [p.apply_async(myFunction, (line, param)) for line in listfile]

...

除了“apply_async”之外,还有其他各种方法,但这应该能很好地满足您的需要。