以 concurrent.futures.ThreadPoolExecutor() 作为执行者:...不等待

with concurrent.futures.ThreadPoolExecutor() as executor: ... does not wait

我正在尝试在 class 的方法中使用 ThreadPoolExecutor() 创建一个线程池,该线程池将在同一个 class 中执行另一个方法。我有 with concurrent.futures.ThreadPoolExecutor()... 但它不等待,并抛出一个错误,说我在 "with..." 语句后查询的字典中没有键。我明白了为什么会抛出错误,因为字典还没有更新,因为池中的线程没有完成执行。我知道线程没有完成执行,因为我在 ThreadPoolExecutor 中调用的方法中有一个 print("done"),并且 "done" 没有打印到控制台。

我是线程的新手,所以如果有任何关于如何更好地做到这一点的建议,我们将不胜感激!

    def tokenizer(self):
        all_tokens = []
        self.token_q = Queue()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for num in range(5):
                executor.submit(self.get_tokens, num)
            executor.shutdown(wait=True)

        print("Hi")
        results = {}
        while not self.token_q.empty():
            temp_result = self.token_q.get()
            results[temp_result[1]] = temp_result[0]
            print(temp_result[1])
        for index in range(len(self.zettels)):
            for zettel in results[index]:
                all_tokens.append(zettel)
        return all_tokens

    def get_tokens(self, thread_index):
        print("!!!!!!!")
        switch = {
            0: self.zettels[:(len(self.zettels)/5)],
            1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
            2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
            3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
            4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
        }
        new_tokens = []
        for zettel in switch.get(thread_index):
            tokens = re.split('\W+', str(zettel))
            tokens = list(filter(None, tokens))
            new_tokens.append(tokens)
        print("done")
        self.token_q.put([new_tokens, thread_index])

'''

预计会在 print ("Hi") 语句之前看到所有 print("!!!!!!")print("done") 语句。 实际上显示 !!!!!!!,然后是 Hi,然后是结果字典的 KeyError

您需要循环 concurrent.futures.as_completed(),如图 here 所示。它会在每个线程完成时产生值。

如您所知,池正在等待; print('done') 永远不会执行,因为可能 TypeError 提早了。
池不直接等待任务完成,它等待其工作线程加入,这隐含地要求任务的执行完成,一种方式(成功)或另一种方式(异常)。

您看不到引发异常的原因是任务被包装在 Future 中。一个Future

[...] encapsulates the asynchronous execution of a callable.

Future 实例由执行程序的 submit 方法返回,它们允许查询执行状态并访问其结果。

这让我想到了一些我想说的话。

self.token_q中的Queue好像没必要
从您共享的代码来看,您仅使用此队列将任务结果传递回 tokenizer 函数。这不是必需的,您可以从调用 submit returns:

Future 访问它
def tokenizer(self):
    all_tokens = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(get_tokens, num) for num in range(5)]
        # executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
        # https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623

    print("Hi")
    results = {}
    for fut in futures:
        try:
            res = fut.result()
            results[res[1]] = res[0]
        except Exception:
            continue
    [...] 

def get_tokens(self, thread_index):
    [...]
    # instead of self.token_q.put([new_tokens, thread_index])
    return new_tokens, thread_index

您的程序可能无法从使用线程中获益
从您共享的代码来看,get_tokens 中的操作似乎是 CPU 绑定的,而不是 I/O 绑定的。如果你的程序是 运行 CPython(或任何其他使用 Global Interpreter Lock 的解释器),在这种情况下使用线程将没有任何好处。

In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.

这意味着对于任何 Python 进程,在任何给定时间只能执行一个线程。如果您手头的任务是 I/O 绑定,即经常暂停以等待 I/O(例如,对于套接字上的数据),这不是什么大问题。如果您的任务需要在处理器中不断执行字节码,那么暂停一个线程让另一个线程执行一些指令没有任何好处。事实上,由此产生的上下文切换甚至可能是有害的。
为此,您可能想要 parallelism instead of concurrency. Take a look at ProcessPoolExecutor
但是,我建议按顺序、并发和并行地 运行 对您的代码进行基准测试。创建进程或线程是有代价的,并且根据要完成的任务,这样做可能比按顺序执行一个任务一个接一个地花费更长的时间。


顺便说一句,这看起来有点可疑:

for index in range(len(self.zettels)):
    for zettel in results[index]:
        all_tokens.append(zettel)

results 似乎总是有五个项目,因为 for num in range(5)。如果 self.zettels 的长度大于五,我希望这里会出现一个 KeyError
如果保证 self.zettels 的长度为五,那么我会在这里看到一些代码优化的潜力。