如果进程消耗过多内存,如何暂停进程?
How to pause processes in case they are consuming too much memory?
背景:我使用美国地质调查局提供的一组命令行实用程序处理行星图像。其中一些是 RAM 猪,达到了极限(10 GB)。 USGS 表示,这只是他们 运行 的方式,并且没有任何计划尝试更好地管理 RAM。我构建了一个 Python 包装器来操作文件列表以调用不同的步骤来处理部分数据(例如所有图像在一个滤色器中拍摄,所有图像都在另一个滤镜中拍摄,所有图像都在另一个滤镜中拍摄,等等) .因为事情是针对多个列表和多个图像完成的,所以我将它线程化,使用我能使用的所有 CPU,将原本可能需要两个月的东西更改为每周 运行。目前,我不使用本机 Python 方法进行线程化;相反,我使用 GNU Parallel(并使用 os.system("") 调用并行,然后调用函数)或者我使用 Pysis,这是调用和多线程 USGS 软件的 Python 方式.
问题:如前所述,对于某些文件,某些步骤会占用大量 RAM,并且无法提前知道这些可能是什么。因此,我可能会遇到这样一种情况,对于某些文件,每个进程在 16GB RAM 的 8 核机器上使用 200 MB 和 运行s 很好,但随后它可能会开始处理其他文件,我得到 RAM creep ,使用几 GB,在 16GB RAM 机器上有 8 个处理器意味着 RAM 被压缩,使用交换 space ... 如果我很幸运并且机器不只是锁定的话。
解决方案?我正在寻找的是一种通过进程名称监控 RAM 使用情况的方法,比如每分钟一次,如果我开始看到 RAM 蠕变(例如,一个进程的 8 个实例,每个实例使用超过 2GB 的 RAM),我可以暂停其中一个,让那个完成,取消暂停另一个,让那个完成,等等,直到这 8 个完成,然后继续执行该步骤可能需要 运行 的其余部分。希望显然,所有这些都将在 Python 中完成,而不是手动完成。
可以吗?如果可以,怎么做?
您可以使用 psutil.Process.suspend()
暂停执行超过给定内存阈值的 运行 进程。监控部分只是反复比较 运行 个进程的 psutil.Process().memory_info().rss
("Resident Set Size") 与您给定的阈值。然后如何安排进一步的处理取决于您。
在下面的示例中,我暂停了罪魁祸首进程,直到其余进程完成,然后一个接一个地恢复曾经暂停的进程。这是一种显示一般机制的简单方法。
import time
import random
from threading import Thread
from multiprocessing import Process, active_children
import psutil
def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'
def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li = []
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)
def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes = []
while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)
time.sleep(1)
for p in suspended_processes:
print(f'\nresuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
MAX_MiB = 200
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
for p in processes:
p.start()
m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()
两个进程因超过 200 MiB 阈值而暂停并在行为进程完成后恢复的示例输出(缩短):
i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')
resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB
resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB
Process finished with exit code 0
编辑:
I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?
我扩展了上面的代码以允许在旧进程完成时启动新进程,最大 运行 进程设置为核心数。我还将它重构为 class,否则它会开始变得混乱,需要管理所有必要的状态。在下面的代码中,为了简洁起见,我交替使用名称 "tasks" 和 "processes"。请注意更改后的进程启动方法和代码中的随附注释。
import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method
import psutil
# `def format_mib` and `def f` from above unchanged...
class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)
self._running_tasks = []
self._suspended_tasks = []
def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()
def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')
def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()
for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')
time.sleep(1)
def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'\nResuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available
MAX_MiB = 200
N_CORES = 2
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()
示例输出(缩短):
Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')
Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB
Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB
Process finished with exit code 0
parallel --memfree
是为这种情况而构建的:
parallel --memfree 1G doit ::: {1..100}
如果有 > 1 GB 的空闲 RAM,这只会产生一个新进程。如果可用空间少于 0.5*1 GB,它将杀死最年轻的作业并将该作业放回队列中。
它被认为只是 pause/suspend 最年轻的作业,但经验表明,将进程换出和换入通常比简单地重新启动作业要慢得多。
背景:我使用美国地质调查局提供的一组命令行实用程序处理行星图像。其中一些是 RAM 猪,达到了极限(10 GB)。 USGS 表示,这只是他们 运行 的方式,并且没有任何计划尝试更好地管理 RAM。我构建了一个 Python 包装器来操作文件列表以调用不同的步骤来处理部分数据(例如所有图像在一个滤色器中拍摄,所有图像都在另一个滤镜中拍摄,所有图像都在另一个滤镜中拍摄,等等) .因为事情是针对多个列表和多个图像完成的,所以我将它线程化,使用我能使用的所有 CPU,将原本可能需要两个月的东西更改为每周 运行。目前,我不使用本机 Python 方法进行线程化;相反,我使用 GNU Parallel(并使用 os.system("") 调用并行,然后调用函数)或者我使用 Pysis,这是调用和多线程 USGS 软件的 Python 方式.
问题:如前所述,对于某些文件,某些步骤会占用大量 RAM,并且无法提前知道这些可能是什么。因此,我可能会遇到这样一种情况,对于某些文件,每个进程在 16GB RAM 的 8 核机器上使用 200 MB 和 运行s 很好,但随后它可能会开始处理其他文件,我得到 RAM creep ,使用几 GB,在 16GB RAM 机器上有 8 个处理器意味着 RAM 被压缩,使用交换 space ... 如果我很幸运并且机器不只是锁定的话。
解决方案?我正在寻找的是一种通过进程名称监控 RAM 使用情况的方法,比如每分钟一次,如果我开始看到 RAM 蠕变(例如,一个进程的 8 个实例,每个实例使用超过 2GB 的 RAM),我可以暂停其中一个,让那个完成,取消暂停另一个,让那个完成,等等,直到这 8 个完成,然后继续执行该步骤可能需要 运行 的其余部分。希望显然,所有这些都将在 Python 中完成,而不是手动完成。
可以吗?如果可以,怎么做?
您可以使用 psutil.Process.suspend()
暂停执行超过给定内存阈值的 运行 进程。监控部分只是反复比较 运行 个进程的 psutil.Process().memory_info().rss
("Resident Set Size") 与您给定的阈值。然后如何安排进一步的处理取决于您。
在下面的示例中,我暂停了罪魁祸首进程,直到其余进程完成,然后一个接一个地恢复曾经暂停的进程。这是一种显示一般机制的简单方法。
import time
import random
from threading import Thread
from multiprocessing import Process, active_children
import psutil
def format_mib(mem_bytes):
"""Format bytes into mebibyte-string."""
return f'{mem_bytes / 2 ** 20:.2f} MiB'
def f(append_length):
"""Main function in child-process. Appends random floats to list."""
p = psutil.Process()
li = []
for i in range(10):
li.extend([random.random() for _ in range(append_length)])
print(f'i: {i} | pid: {p.pid} | '
f'{format_mib(p.memory_full_info().rss)}')
time.sleep(2)
def monitored(running_processes, max_mib):
"""Monitor memory usage for running processes.
Suspend execution for processes surpassing `max_mib` and complete
one by one after behaving processes have finished.
"""
running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
suspended_processes = []
while running_processes:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return `False`
# for finished processes.
actual_processes = running_processes.copy()
for p in actual_processes:
if not p.is_running():
running_processes.remove(p)
print(f'removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > max_mib:
print(f'suspending process: {p}')
p.suspend()
running_processes.remove(p)
suspended_processes.append(p)
time.sleep(1)
for p in suspended_processes:
print(f'\nresuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
MAX_MiB = 200
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
processes = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
for p in processes:
p.start()
m = Thread(target=monitored, args=(processes, MAX_MiB))
m.start()
m.join()
两个进程因超过 200 MiB 阈值而暂停并在行为进程完成后恢复的示例输出(缩短):
i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')
resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB
resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB
Process finished with exit code 0
编辑:
I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?
我扩展了上面的代码以允许在旧进程完成时启动新进程,最大 运行 进程设置为核心数。我还将它重构为 class,否则它会开始变得混乱,需要管理所有必要的状态。在下面的代码中,为了简洁起见,我交替使用名称 "tasks" 和 "processes"。请注意更改后的进程启动方法和代码中的随附注释。
import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method
import psutil
# `def format_mib` and `def f` from above unchanged...
class TaskProcessor(Thread):
"""Processor class which monitors memory usage for running
tasks (processes). Suspends execution for tasks surpassing
`max_mib` and completes them one by one, after behaving
tasks have finished.
"""
def __init__(self, n_cores, max_mib, tasks):
super().__init__()
self.n_cores = n_cores
self.max_mib = max_mib # memory threshold
self.tasks = deque(tasks)
self._running_tasks = []
self._suspended_tasks = []
def run(self):
"""Main-function in new thread."""
self._update_running_tasks()
self._monitor_running_tasks()
self._process_suspended_tasks()
def _update_running_tasks(self):
"""Start new tasks if we have less running tasks than cores."""
while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
p = self.tasks.popleft()
p.start()
# for further process-management we here just need the
# psutil.Process wrapper
self._running_tasks.append(psutil.Process(pid=p.pid))
print(f'Started process: {self._running_tasks[-1]}')
def _monitor_running_tasks(self):
"""Monitor running tasks. Replace completed tasks and suspend tasks
which exceed the memory threshold `self.max_mib`.
"""
# loop while we have running or non-started tasks
while self._running_tasks or self.tasks:
active_children() # Joins all finished processes.
# Without it, p.is_running() below on Unix would not return
# `False` for finished processes.
self._update_running_tasks()
actual_tasks = self._running_tasks.copy()
for p in actual_tasks:
if not p.is_running(): # process has finished
self._running_tasks.remove(p)
print(f'Removed finished process: {p}')
else:
if p.memory_info().rss / 2 ** 20 > self.max_mib:
p.suspend()
self._running_tasks.remove(p)
self._suspended_tasks.append(p)
print(f'Suspended process: {p}')
time.sleep(1)
def _process_suspended_tasks(self):
"""Resume processing of suspended tasks."""
for p in self._suspended_tasks:
print(f'\nResuming process: {p}')
p.resume()
p.wait()
if __name__ == '__main__':
# Forking (default on Unix-y systems) an already multithreaded process is
# error-prone. Since we intend to start processes after we are already
# multithreaded, we switch to another start-method.
set_start_method('spawn') # or 'forkserver' (a bit faster start up) if available
MAX_MiB = 200
N_CORES = 2
append_lengths = [100000, 500000, 1000000, 2000000, 300000]
tasks = [Process(target=f, args=(append_length,))
for append_length in append_lengths]
tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
tp.start()
tp.join()
示例输出(缩短):
Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')
Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB
Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB
Process finished with exit code 0
parallel --memfree
是为这种情况而构建的:
parallel --memfree 1G doit ::: {1..100}
如果有 > 1 GB 的空闲 RAM,这只会产生一个新进程。如果可用空间少于 0.5*1 GB,它将杀死最年轻的作业并将该作业放回队列中。
它被认为只是 pause/suspend 最年轻的作业,但经验表明,将进程换出和换入通常比简单地重新启动作业要慢得多。