在 concurrent.futures 中检测失败的任务

Detect failed tasks in concurrent.futures

我一直在使用 concurrent.futures,因为它有一个简单的界面,让用户可以轻松控制 threads/processes 的最大数量。但是,似乎 concurrent.futures 隐藏了失败的任务并在所有任务 finished/failed.

之后继续主线程
import concurrent.futures

def f(i):
    return (i + 's')

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    fs = [executor.submit(f, i ) for i in range(10)]
    concurrent.futures.wait(fs)

对任何整数调用 f 都会导致 TypeError。但是,整个脚本运行得很好并以代码 0 退出。有什么方法可以让它在任何线程失败时抛出 exception/error?

或者,有没有更好的方法来限制 threads/processes 的数量而不使用 concurrent.futures?

还有另一种方法可以对 multiprocessing.Pool(对于进程)或 multiprocessing.pool.ThreadPool(对于线程)执行相同的操作。据我所知,它会重新抛出任何捕获的异常。

concurrent.futures.wait 将确保完成所有任务,但它不会检查成功(某些 return-ed)与失败(引发的异常并且未在 worker 函数中捕获)。为此,您需要在每个 Future 上调用 .result()(这将导致它重新 raise 任务中的异常,或者生成 return-ed价值)。还有其他方法可以在不实际在主线程中引发的情况下进行检查(例如 .exception()),但 .result() 是最直接的方法。

如果你想重新raise,最简单的方法就是将wait()调用替换为:

for fut in concurrent.futures.as_completed(fs):
    fut.result()

将在 Future 完成后处理结果,如果发生,会立即 raise 一个 Exception。或者,您继续使用 wait,以便在检查任何异常之前完成所有任务,然后直接迭代 fs 并在每个任务上调用 .result()