Python 工人内部错误 map_async 影响执行计划

Python error inside worker for map_async effects execution plan

伙计们!我遇到过这个问题,我很好奇发生了什么。 Pool fork 3个进程。我的进一步假设:每个进程都会从父任务队列中提取任务。正如我们所见,进程并没有结束,但是一些任务被跳过了。也许有人有想法?

from multiprocessing import Pool
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with Pool(processes=3) as pool:
        p = pool.map_async(wrk, (i for i in range(50)))
        p.wait()

结果

0 29836 GONNA DIE
5 29835
6 29835 GONNA DIE
10 29836
11 29836
12 29836 GONNA DIE
15 29836 GONNA DIE
20 29836
21 29836 GONNA DIE
25 29835
26 29835
27 29835 GONNA DIE
30 29836 GONNA DIE
35 29835
36 29835 GONNA DIE
40 29836
41 29836
42 29836 GONNA DIE
45 29835 GONNA DIE

非常有趣。当您没有为 map 方法指定 chunksize 参数时,将根据 iterable 的大小和泳池大小。在您的情况下,我相信它会使用 chunksize 值 5.

这意味着池中的每个空闲进程将同时从输入队列中拉出最多 5 个任务,并将它们作为一个批处理。似乎(这让我感到惊讶)如果其中一个任务引发异常,则该过程不会继续处理批处理中的任何其他剩余任务。如果我们强制 chunksize 为 1,那么我们会得到:

from multiprocessing import Pool
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with Pool(processes=3) as pool:
        p = pool.map_async(wrk, (i for i in range(50)), chunksize=1)
        p.wait()

打印:

0 186952 GONNA DIE
1 249800
2 249800
3 249800 GONNA DIE
4 186952
5 186952
6 244428 GONNA DIE
7 186952
8 186952
9 186952 GONNA DIE
10 249800
11 186952
12 249800 GONNA DIE
13 186952
14 186952
15 186952 GONNA DIE
16 249800
17 249800
18 249800 GONNA DIE
20 186952
19 244428
21 186952 GONNA DIE
22 244428
23 244428
24 249800 GONNA DIE
25 244428
26 186952
27 244428 GONNA DIE
28 186952
29 186952
30 249800 GONNA DIE
32 244428
31 186952
33 244428 GONNA DIE
34 186952
35 249800
36 186952 GONNA DIE
37 249800
38 244428
39 249800 GONNA DIE
40 244428
41 186952
42 244428 GONNA DIE
43 249800
44 186952
45 249800 GONNA DIE
46 186952
47 244428
48 249800 GONNA DIE
49 186952

同样,如果我们让chunksize=50,让第一个空闲进程抢走所有提交的任务,唯一的输出是:

0 115408 GONNA DIE

同样的事情发生在 concurrent.futures.ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        p = executor.map(wrk, (i for i in range(50)), chunksize=5)

打印:

0 39280 GONNA DIE
5 60616
10 39280
6 60616 GONNA DIE
11 39280
12 39280 GONNA DIE
15 60616 GONNA DIE
20 39280
25 60616
21 39280 GONNA DIE
26 60616
30 39280 GONNA DIE
27 60616 GONNA DIE
35 104328
40 39280
45 60616 GONNA DIE
36 104328 GONNA DIE
41 39280
42 39280 GONNA DIE

但是如果您使用默认的 chunksize=1 值,您将看到全部 50 行输出。所以这个“特征”并不是Pool class.

特有的

再次,非常令人惊讶。我将查看 multiprocessing 代码,并会尽力回复您。

更新

我查看了 concurrent.futures.ProcessPoolExecutor.map 的代码,它比 multiprocessing.pool.Pool.map 的代码更容易理解,果然在 chunksize[=95 中的任务之一=] batch 得到异常,batch 中的其余任务被放弃。

如果您使用 multiprocessing.pool.Pool.mapchunksize 值 > 1,那么如果在您的工作函数中引发异常,不仅是批处理的其余部分已放弃,但任何排队等待 运行 的批次。 map 函数本身会抛出异常。要从成功完成的辅助函数执行中获得任何可能的结果,请使用 imap(或 imap_unordered.

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.imap(worker, range(1, 30), chunksize=4)
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

版画

x = 1
x = 2
x = 3
x = 4
Exception: 8

如果指定了 chunksize=1 参数,那么如果 worker 函数引发异常,排队任务等待 运行 或已经 运行未完成的宁不会被放弃:

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.imap(worker, range(1, 30), chunksize=1)
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

打印:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29

使用 concurrent.futures 的最接近的等效程序是:

from concurrent.futures import ProcessPoolExecutor

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    executor = ProcessPoolExecutor()
    results = executor.map(worker, range(1, 30))
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

(使用的默认 chunksize 值为 1)

打印:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8

但是,如您所见,一旦引发异常,就 returning 值而言,所有 map 处理都会停止。但是,您可以而且应该使用 submit 方法来解决这个问题:

from concurrent.futures import ProcessPoolExecutor

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    executor = ProcessPoolExecutor()
    futures = [executor.submit(worker, idx) for idx in range(1, 30)]
    for future in futures:
        try:
            x = future.result()
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

打印:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29

结论

如果您的辅助函数不引发异常,则可以避免所有这些问题。如果它需要 return 一个值并且发生异常情况,它甚至可以 return 一个 Exception 主进程可以测试的实例:

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        return ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.map(worker, range(1, 30), chunksize=4)
    for result in results:
        if isinstance(result, Exception):
            print('Exception:', result)
        else:
            print('x =', result)

打印:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29