Python 多处理 - 池中的进程数是否会因错误而减少?
Python multiprocessing - does the number of processes in a pool decrease on error?
代码:
import multiprocessing
print(f'num cpus {multiprocessing.cpu_count():d}')
import sys; print(f'Python {sys.version} on {sys.platform}')
def _process(m):
print(m) #; return m
raise ValueError(m)
args_list = [[i] for i in range(1, 20)]
if __name__ == '__main__':
with multiprocessing.Pool(2) as p:
print([r for r in p.starmap(_process, args_list)])
打印:
num cpus 8
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28)
[Clang 6.0 (clang-600.0.57)] on darwin
1
7
4
10
13
16
19
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 8, in _process
raise ValueError(m)
ValueError: 1
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 18, in <module>
print([r for r in p.starmap(_process, args_list)])
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 298, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 683, in get
raise self._value
ValueError: 1
Process finished with exit code 1
将池中的进程数增加到 3 或 4 打印所有奇数(可能乱序):
1
3
5
9
11
7
13
15
17
19
而从 5 开始,它会打印 1-19 的所有范围。那么这里发生了什么?进程是否在多次失败后崩溃?
这当然是一个玩具示例,但它来自我遇到的一个真实代码问题 - 离开多处理池 运行 几天稳定 cpu 使用下降,好像某些进程被杀死(注意 cpu 利用率在 03/04 和 03/06 走下坡路,同时还有很多任务要 运行):
当代码终止时,它向我显示了一个(这里只有一个,而进程更多)multiprocessing.pool.RemoteTraceback
- 额外的问题是这个回溯是随机的吗?在这个玩具示例中,它通常是 ValueError: 1
但有时也会出现其他数字。多处理是否保留第一个崩溃进程的第一个回溯?
一个 window 中的 watch ps aux
和另一个 window 中的代码的快速实验似乎表明不,异常不会使子进程崩溃。
作为 map/starmap 操作基础的 MapResult
对象仅收集第一个异常,如果任何作业因异常而失败,则认为整个地图作业失败。
(发送给每个工人的工作数量由 .map()
和朋友的 chunksize
参数决定。)
如果你想要对异常更有弹性的东西,你可以只使用 .apply_async()
:
import multiprocessing
import os
def _process(m):
if m % 2 == 0:
raise ValueError('I only work on odd numbers')
return m * 8
if __name__ == '__main__':
args_list = list(range(1, 20))
with multiprocessing.Pool(2) as p:
params_and_jobs = [((arg,), p.apply_async(_process, (arg,))) for arg in args_list]
for params, job in params_and_jobs:
job.wait()
# regularly you'd use `job.get()`, but it would `raise` the exception,
# which is not suitable for this example, so we dig in deeper and just use
# the `._value` it'd return or raise:
print(params, type(job._value), job._value)
产出
(1,) <class 'int'> 8
(2,) <class 'ValueError'> I only work on odd numbers
(3,) <class 'int'> 24
(4,) <class 'ValueError'> I only work on odd numbers
(5,) <class 'int'> 40
(6,) <class 'ValueError'> I only work on odd numbers
(7,) <class 'int'> 56
(8,) <class 'ValueError'> I only work on odd numbers
(9,) <class 'int'> 72
(10,) <class 'ValueError'> I only work on odd numbers
(11,) <class 'int'> 88
(12,) <class 'ValueError'> I only work on odd numbers
(13,) <class 'int'> 104
(14,) <class 'ValueError'> I only work on odd numbers
(15,) <class 'int'> 120
(16,) <class 'ValueError'> I only work on odd numbers
(17,) <class 'int'> 136
(18,) <class 'ValueError'> I only work on odd numbers
(19,) <class 'int'> 152
不,只是整个任务爆炸了,而不是进程本身。您在玩具示例中观察到的行为可以用工人数量和可迭代长度的组合产生的块大小来解释。当您从 获取函数 calc_chunksize_info
时,您可以看到结果块大小的差异:
calc_chunksize_info(n_workers=2, len_iterable=20)
# Chunkinfo(n_workers=2, len_iterable=20, n_chunks=7, chunksize=3, last_chunk=2)
calc_chunksize_info(n_workers=5, len_iterable=20)
# Chunkinfo(n_workers=5, len_iterable=20, n_chunks=20, chunksize=1, last_chunk=1)
如果 chunksize > 1,任务中所有未触及的 也会丢失,一旦第一个 taskel 引发异常。直接在目标函数中处理可预期的异常,或者编写一个额外的错误处理包装器来防止这种情况发生。
When the code terminated it presented me with one (and one only as here, while the processes were many more) multiprocessing.pool.RemoteTraceback - bonus question is this traceback random? In this toy example, it is usually ValueError: 1 but sometimes also other numbers appear. Does multiprocessing keep the first traceback from the first process that crashes?
工作进程从共享队列中获取任务。从队列中读取是顺序的,因此任务 1 总是在任务 2 之前被读取。但是,无法预测结果在工作程序中准备好的顺序。有很多硬件和 OS 相关因素在起作用,所以是的,回溯是随机的,因为结果的顺序是随机的,因为(字符串化的)回溯是发送回父级的结果的一部分.结果也通过共享队列发回,Pool 在内部处理返回的任务 JIT。如果任务 returns 不成功,整个作业将被标记为不成功,并丢弃进一步到达的任务。一旦作业中的所有任务都返回,只有第一个检索到的异常会在父级中重新引发。
代码:
import multiprocessing
print(f'num cpus {multiprocessing.cpu_count():d}')
import sys; print(f'Python {sys.version} on {sys.platform}')
def _process(m):
print(m) #; return m
raise ValueError(m)
args_list = [[i] for i in range(1, 20)]
if __name__ == '__main__':
with multiprocessing.Pool(2) as p:
print([r for r in p.starmap(_process, args_list)])
打印:
num cpus 8
Python 3.7.1 (v3.7.1:260ec2c36a, Oct 20 2018, 03:13:28)
[Clang 6.0 (clang-600.0.57)] on darwin
1
7
4
10
13
16
19
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 8, in _process
raise ValueError(m)
ValueError: 1
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/ubik-mac13/Library/Preferences/PyCharm2018.3/scratches/multiprocess_error.py", line 18, in <module>
print([r for r in p.starmap(_process, args_list)])
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 298, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 683, in get
raise self._value
ValueError: 1
Process finished with exit code 1
将池中的进程数增加到 3 或 4 打印所有奇数(可能乱序):
1
3
5
9
11
7
13
15
17
19
而从 5 开始,它会打印 1-19 的所有范围。那么这里发生了什么?进程是否在多次失败后崩溃?
这当然是一个玩具示例,但它来自我遇到的一个真实代码问题 - 离开多处理池 运行 几天稳定 cpu 使用下降,好像某些进程被杀死(注意 cpu 利用率在 03/04 和 03/06 走下坡路,同时还有很多任务要 运行):
当代码终止时,它向我显示了一个(这里只有一个,而进程更多)multiprocessing.pool.RemoteTraceback
- 额外的问题是这个回溯是随机的吗?在这个玩具示例中,它通常是 ValueError: 1
但有时也会出现其他数字。多处理是否保留第一个崩溃进程的第一个回溯?
一个 window 中的 watch ps aux
和另一个 window 中的代码的快速实验似乎表明不,异常不会使子进程崩溃。
作为 map/starmap 操作基础的 MapResult
对象仅收集第一个异常,如果任何作业因异常而失败,则认为整个地图作业失败。
(发送给每个工人的工作数量由 .map()
和朋友的 chunksize
参数决定。)
如果你想要对异常更有弹性的东西,你可以只使用 .apply_async()
:
import multiprocessing
import os
def _process(m):
if m % 2 == 0:
raise ValueError('I only work on odd numbers')
return m * 8
if __name__ == '__main__':
args_list = list(range(1, 20))
with multiprocessing.Pool(2) as p:
params_and_jobs = [((arg,), p.apply_async(_process, (arg,))) for arg in args_list]
for params, job in params_and_jobs:
job.wait()
# regularly you'd use `job.get()`, but it would `raise` the exception,
# which is not suitable for this example, so we dig in deeper and just use
# the `._value` it'd return or raise:
print(params, type(job._value), job._value)
产出
(1,) <class 'int'> 8
(2,) <class 'ValueError'> I only work on odd numbers
(3,) <class 'int'> 24
(4,) <class 'ValueError'> I only work on odd numbers
(5,) <class 'int'> 40
(6,) <class 'ValueError'> I only work on odd numbers
(7,) <class 'int'> 56
(8,) <class 'ValueError'> I only work on odd numbers
(9,) <class 'int'> 72
(10,) <class 'ValueError'> I only work on odd numbers
(11,) <class 'int'> 88
(12,) <class 'ValueError'> I only work on odd numbers
(13,) <class 'int'> 104
(14,) <class 'ValueError'> I only work on odd numbers
(15,) <class 'int'> 120
(16,) <class 'ValueError'> I only work on odd numbers
(17,) <class 'int'> 136
(18,) <class 'ValueError'> I only work on odd numbers
(19,) <class 'int'> 152
不,只是整个任务爆炸了,而不是进程本身。您在玩具示例中观察到的行为可以用工人数量和可迭代长度的组合产生的块大小来解释。当您从 calc_chunksize_info
时,您可以看到结果块大小的差异:
calc_chunksize_info(n_workers=2, len_iterable=20)
# Chunkinfo(n_workers=2, len_iterable=20, n_chunks=7, chunksize=3, last_chunk=2)
calc_chunksize_info(n_workers=5, len_iterable=20)
# Chunkinfo(n_workers=5, len_iterable=20, n_chunks=20, chunksize=1, last_chunk=1)
如果 chunksize > 1,任务中所有未触及的
When the code terminated it presented me with one (and one only as here, while the processes were many more) multiprocessing.pool.RemoteTraceback - bonus question is this traceback random? In this toy example, it is usually ValueError: 1 but sometimes also other numbers appear. Does multiprocessing keep the first traceback from the first process that crashes?
工作进程从共享队列中获取任务。从队列中读取是顺序的,因此任务 1 总是在任务 2 之前被读取。但是,无法预测结果在工作程序中准备好的顺序。有很多硬件和 OS 相关因素在起作用,所以是的,回溯是随机的,因为结果的顺序是随机的,因为(字符串化的)回溯是发送回父级的结果的一部分.结果也通过共享队列发回,Pool 在内部处理返回的任务 JIT。如果任务 returns 不成功,整个作业将被标记为不成功,并丢弃进一步到达的任务。一旦作业中的所有任务都返回,只有第一个检索到的异常会在父级中重新引发。