concurrent.futures.Executor.map 中的异常处理

Exception handling in concurrent.futures.Executor.map

来自https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map

If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

以下代码片段仅输出第一个异常(异常:1),然后停止。这与上面的说法矛盾吗?我希望以下内容能够打印出循环中的所有异常。

def test_func(val):
  raise Exception(val)        

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:   
  for r in executor.map(test_func,[1,2,3,4,5]):
    try:
      print r
    except Exception as exc:
      print 'generated an exception: %s' % (exc)

map 方法 returns 一个生成器,它允许在准备好后迭代结果。

不幸的是,发生异常后无法恢复生成器。来自 PEP 255.

If an unhandled exception-- including, but not limited to, StopIteration --is raised by, or passes through, a generator function, then the exception is passed on to the caller in the usual way, and subsequent attempts to resume the generator function raise StopIteration. In other words, an unhandled exception terminates a generator's useful life.

文档中还有其他库,例如pebble which allow to continue the iteration after an error occurs. Check the examples

如上所述,不幸的是executor.map的API是有限的,只让你得到第一个例外。此外,在遍历结果时,您只会获得第一个异常之前的值。

为了回答您的问题,如果您不想使用不同的库,您可以展开地图并手动应用每个函数:

future_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
  for arg in range(10):
    future = executor.submit(test_func, arg)
    future_list.append(future)

for future in future_list:
  try:
    print(future.result())
  except Exception as e:
    print(e)

这使您可以单独处理每个未来。

Ehsan 的解决方案很好,但在完成时获取结果而不是等待列表中的顺序项目完成可能稍微更有效。这是来自 library docs.

的示例
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

虽然其他人对捕获多个异常的正确方法给出了很好的答案,但我想回答为什么问题中捕获异常的方法是错误的。以下片段:

class ExceptionA(Exception):
    pass


def test_func(val):
    raise ExceptionA(val)


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    try:
        for r in executor.map(test_func, [1, 2, 3, 4, 5]):
            try:
                print(r)
            except ExceptionA as exc:
                print(f'Catch inside: {exc}')

    except ExceptionA as exc:
        print(f'Catch outside: {exc}')

给出输出 Catch outside: 1.

python docs 内容为:

If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

这意味着如果你想捕获异常,你需要在循环之外捕获它,因为值是在循环语句而不是打印语句中检索的。