从 multiprocessing.Pool.map 提前退出(子进程中的 raise 不起作用)

early exit from multiprocessing.Pool.map (raise in child process doesn't work)

我的复制有误,如. I'm leaving the code mostly as-is as I'm not sure where this falls between clarifying and changing the meaning所述。

我有数千个作业需要 运行 并且希望任何错误都立即停止执行。 我将任务包装在 try / exceptraise 中,以便我可以记录错误(没有所有 multiprocessing/threading 噪音),然后重新提出。 这确实不会终止主进程。

这是怎么回事,我怎样才能得到我正在寻找的提前退出? sys.exit(1) 在子死锁中,将 try / exceptraise 函数包装在另一个函数中也不起作用。

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_reraise.py", line 5, in f_reraise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_reraise.py", line 14, in <module>
    test_reraise()
  File "mp_reraise.py", line 12, in test_reraise
    p.map(f_reraise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
    try:
        raise Exception(args)
    except Exception as e:
        print(e)
        raise

def test_reraise():
    with multiprocessing.Pool() as p:
        p.map(f_reraise, range(10))

test_reraise()

如果我没有抓住并重新加注,执行会按预期提前停止: [根据 Rugnar 的回答,这实际上并没有停止]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_raise.py", line 4, in f_raise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_raise.py", line 10, in <module>
    test_raise()
  File "mp_raise.py", line 8, in test_raise
    p.map(f_raise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)  

mp_raise.py

import multiprocessing

def f_raise(*args):
    # missing print, which would demonstrate that
    # this actually does not stop early
    raise Exception(args)

def test_raise():
    with multiprocessing.Pool() as p:
        p.map(f_raise, range(10))

test_raise()

在您的 mp_raise.py 中,您不打印任何内容,因此看不到完成了多少作业。我添加了 print 并发现 pool 仅在作业迭代器耗尽时才会看到 child 的异常。所以它永远不会提前停止。

如果您需要在异常发生后提前停止,试试这个

import time
import multiprocessing as mp


def f_reraise(i):
    if abort.is_set():  # cancel job if abort happened
        return
    time.sleep(i / 1000)  # add sleep so jobs are not instant, like in real life
    if abort.is_set():  # probably we need stop job in the middle of execution if abort happened
        return
    print(i)
    try:
        raise Exception(i)
    except Exception as e:
        abort.set()
        print('error:', e)
        raise


def init(a):
    global abort
    abort = a


def test_reraise():
    _abort = mp.Event()

    # jobs should stop being fed to the pool when abort happened
    # so we wrap jobs iterator this way
    def pool_args():
        for i in range(100):
            if not _abort.is_set():
                yield i

    # initializer and init is a way to share event between processes
    # thanks to 
    with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
        p.map(f_reraise, pool_args())


if __name__ == '__main__':
    test_reraise()