在 python 多处理中限制总 CPU 使用
Limit total CPU usage in python multiprocessing
我正在使用 multiprocessing.Pool.imap 到 运行 许多独立的工作并行使用 Python 2.7 on Windows 7. 使用默认设置,我的总数 CPU 使用率固定为 100%,由 Windows 任务管理器测量。这使得当我的代码 运行 处于后台时无法执行任何其他工作。
我已经尝试将进程数限制为 CPUs 减 1,如 How to limit the number of processors that Python uses:
中所述
pool = Pool(processes=max(multiprocessing.cpu_count()-1, 1)
for p in pool.imap(func, iterable):
...
这确实减少了 运行ning 进程的总数。然而,每个过程只是占用更多的周期来弥补它。所以我的总 CPU 使用率仍然固定在 100%。
有没有办法直接限制 CPU 的总使用量——而不仅仅是进程的数量——或者如果失败了,有什么解决方法吗?
解决方案取决于您想做什么。这里有几个选项:
较低的进程优先级
您可以 nice
子流程。这样,虽然它们仍然会吃掉 100% 的 CPU,但当您启动其他应用程序时,OS 会优先考虑其他应用程序。如果您想在笔记本电脑的背景上留下工作密集型计算 运行 并且不关心 CPU 风扇一直 运行ning,然后使用 psutils
是您的解决方案。该脚本是一个测试脚本,它在所有内核上运行了 运行 足够的时间,因此您可以看到它的行为方式。
from multiprocessing import Pool, cpu_count
import math
import psutil
import os
def f(i):
return math.sqrt(i)
def limit_cpu():
"is called at every process start"
p = psutil.Process(os.getpid())
# set to lowest priority, this is windows only, on Unix use ps.nice(19)
p.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
if __name__ == '__main__':
# start "number of cores" processes
pool = Pool(None, limit_cpu)
for p in pool.imap(f, range(10**8)):
pass
诀窍是 limit_cpu
是 运行 在每个进程的开始(请参阅 initializer
argment in the doc). Whereas Unix has levels -19 (highest prio) to 19 (lowest prio), Windows has a few distinct levels 优先级。BELOW_NORMAL_PRIORITY_CLASS
可能最符合您的要求,有还有 IDLE_PRIORITY_CLASS
表示 Windows 到 运行 只有当系统空闲时你的进程。
如果在任务管理器中切换到详细模式并右键单击该进程,则可以查看优先级:
进程数减少
尽管您拒绝了此选项,但它仍然可能是一个不错的选择:假设您使用 pool = Pool(max(cpu_count()//2, 1))
将子进程的数量限制为 cpu 核心的一半,然后 OS 最初 运行 在一半的 cpu 内核上运行这些进程,而其他内核保持空闲状态,或者只是 运行 当前 运行 正在运行的其他应用程序。短时间后,OS 重新安排进程并可能将它们移动到其他 cpu 核心等。Windows 作为基于 Unix 的系统都以这种方式运行。
Windows: 运行 4 核 2 个进程:
OSX: 运行 8 核 4 个进程:
您会看到两个 OS 平衡了核心之间的进程,虽然不均匀,所以您仍然看到一些核心的百分比高于其他核心。
睡觉
如果你绝对想确定你的进程永远不会吃掉某个核心的 100%(例如,如果你想防止 cpu 风扇上升),那么你可以 运行 在你的处理函数中休眠:
from time import sleep
def f(i):
sleep(0.01)
return math.sqrt(i)
这使得 OS "schedule out" 您的进程每次计算 0.01
秒,并为其他应用程序腾出空间。如果没有其他应用程序,则 cpu 核心处于空闲状态,因此它永远不会达到 100%。您需要尝试不同的睡眠持续时间,它也会因您 运行 使用的计算机而异。如果你想让它变得非常复杂,你可以根据 cpu_times()
报告的内容调整睡眠。
在OS级别
您可以使用nice
为单个命令设置优先级。您也可以使用 nice 启动 python 脚本。 (以下来自:http://blog.scoutapp.com/articles/2014/11/04/restricting-process-cpu-usage-using-nice-cpulimit-and-cgroups)
nice
The nice command tweaks the priority level of a process so that it runs less frequently. This is useful when you need to run a
CPU intensive task as a background or batch job. The niceness level
ranges from -20 (most favorable scheduling) to 19 (least favorable).
Processes on Linux are started with a niceness of 0 by default. The
nice command (without any additional parameters) will start a process
with a niceness of 10. At that level the scheduler will see it as a
lower priority task and give it less CPU resources.Start two
matho-primes tasks, one with nice and one without:
nice matho-primes 0 9999999999 > /dev/null &matho-primes 0 9999999999 > /dev/null &
matho-primes 0 9999999999 > /dev/null &
Now run top.
作为 Python
中的函数
另一种方法是使用 psutils 检查您过去一分钟的 CPU 平均负载,然后让您的线程检查 CPU 平均负载,如果您低于指定的负载则启动另一个线程CPU 加载目标,如果您高于 CPU 加载目标,则睡眠或终止线程。当您使用计算机时,这不会妨碍您,但会保持恒定的 CPU 负载。
# Import Python modules
import time
import os
import multiprocessing
import psutil
import math
from random import randint
# Main task function
def main_process(item_queue, args_array):
# Go through each link in the array passed in.
while not item_queue.empty():
# Get the next item in the queue
item = item_queue.get()
# Create a random number to simulate threads that
# are not all going to be the same
randomizer = randint(100, 100000)
for i in range(randomizer):
algo_seed = math.sqrt(math.sqrt(i * randomizer) % randomizer)
# Check if the thread should continue based on current load balance
if spool_down_load_balance():
print "Process " + str(os.getpid()) + " saying goodnight..."
break
# This function will build a queue and
def start_thread_process(queue_pile, args_array):
# Create a Queue to hold link pile and share between threads
item_queue = multiprocessing.Queue()
# Put all the initial items into the queue
for item in queue_pile:
item_queue.put(item)
# Append the load balancer thread to the loop
load_balance_process = multiprocessing.Process(target=spool_up_load_balance, args=(item_queue, args_array))
# Loop through and start all processes
load_balance_process.start()
# This .join() function prevents the script from progressing further.
load_balance_process.join()
# Spool down the thread balance when load is too high
def spool_down_load_balance():
# Get the count of CPU cores
core_count = psutil.cpu_count()
# Calulate the short term load average of past minute
one_minute_load_average = os.getloadavg()[0] / core_count
# If load balance above the max return True to kill the process
if one_minute_load_average > args_array['cpu_target']:
print "-Unacceptable load balance detected. Killing process " + str(os.getpid()) + "..."
return True
# Load balancer thread function
def spool_up_load_balance(item_queue, args_array):
print "[Starting load balancer...]"
# Get the count of CPU cores
core_count = psutil.cpu_count()
# While there is still links in queue
while not item_queue.empty():
print "[Calculating load balance...]"
# Check the 1 minute average CPU load balance
# returns 1,5,15 minute load averages
one_minute_load_average = os.getloadavg()[0] / core_count
# If the load average much less than target, start a group of new threads
if one_minute_load_average < args_array['cpu_target'] / 2:
# Print message and log that load balancer is starting another thread
print "Starting another thread group due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
time.sleep(5)
# Start another group of threads
for i in range(3):
start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
start_new_thread.start()
# Allow the added threads to have an impact on the CPU balance
# before checking the one minute average again
time.sleep(20)
# If load average less than target start single thread
elif one_minute_load_average < args_array['cpu_target']:
# Print message and log that load balancer is starting another thread
print "Starting another single thread due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
# Start another thread
start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
start_new_thread.start()
# Allow the added threads to have an impact on the CPU balance
# before checking the one minute average again
time.sleep(20)
else:
# Print CPU load balance
print "Reporting stable CPU load balance: " + str(one_minute_load_average * 100) + "%"
# Sleep for another minute while
time.sleep(20)
if __name__=="__main__":
# Set the queue size
queue_size = 10000
# Define an arguments array to pass around all the values
args_array = {
# Set some initial CPU load values as a CPU usage goal
"cpu_target" : 0.60,
# When CPU load is significantly low, start this number
# of threads
"thread_group_size" : 3
}
# Create an array of fixed length to act as queue
queue_pile = list(range(queue_size))
# Set main process start time
start_time = time.time()
# Start the main process
start_thread_process(queue_pile, args_array)
print '[Finished processing the entire queue! Time consuming:{0} Time Finished: {1}]'.format(time.time() - start_time, time.strftime("%c"))
在Linux中:
使用带有数值的 nice():
#on Unix use ps.nice(10) for very low priority
p.nice(10)
我正在使用 multiprocessing.Pool.imap 到 运行 许多独立的工作并行使用 Python 2.7 on Windows 7. 使用默认设置,我的总数 CPU 使用率固定为 100%,由 Windows 任务管理器测量。这使得当我的代码 运行 处于后台时无法执行任何其他工作。
我已经尝试将进程数限制为 CPUs 减 1,如 How to limit the number of processors that Python uses:
中所述pool = Pool(processes=max(multiprocessing.cpu_count()-1, 1)
for p in pool.imap(func, iterable):
...
这确实减少了 运行ning 进程的总数。然而,每个过程只是占用更多的周期来弥补它。所以我的总 CPU 使用率仍然固定在 100%。
有没有办法直接限制 CPU 的总使用量——而不仅仅是进程的数量——或者如果失败了,有什么解决方法吗?
解决方案取决于您想做什么。这里有几个选项:
较低的进程优先级
您可以 nice
子流程。这样,虽然它们仍然会吃掉 100% 的 CPU,但当您启动其他应用程序时,OS 会优先考虑其他应用程序。如果您想在笔记本电脑的背景上留下工作密集型计算 运行 并且不关心 CPU 风扇一直 运行ning,然后使用 psutils
是您的解决方案。该脚本是一个测试脚本,它在所有内核上运行了 运行 足够的时间,因此您可以看到它的行为方式。
from multiprocessing import Pool, cpu_count
import math
import psutil
import os
def f(i):
return math.sqrt(i)
def limit_cpu():
"is called at every process start"
p = psutil.Process(os.getpid())
# set to lowest priority, this is windows only, on Unix use ps.nice(19)
p.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
if __name__ == '__main__':
# start "number of cores" processes
pool = Pool(None, limit_cpu)
for p in pool.imap(f, range(10**8)):
pass
诀窍是 limit_cpu
是 运行 在每个进程的开始(请参阅 initializer
argment in the doc). Whereas Unix has levels -19 (highest prio) to 19 (lowest prio), Windows has a few distinct levels 优先级。BELOW_NORMAL_PRIORITY_CLASS
可能最符合您的要求,有还有 IDLE_PRIORITY_CLASS
表示 Windows 到 运行 只有当系统空闲时你的进程。
如果在任务管理器中切换到详细模式并右键单击该进程,则可以查看优先级:
进程数减少
尽管您拒绝了此选项,但它仍然可能是一个不错的选择:假设您使用 pool = Pool(max(cpu_count()//2, 1))
将子进程的数量限制为 cpu 核心的一半,然后 OS 最初 运行 在一半的 cpu 内核上运行这些进程,而其他内核保持空闲状态,或者只是 运行 当前 运行 正在运行的其他应用程序。短时间后,OS 重新安排进程并可能将它们移动到其他 cpu 核心等。Windows 作为基于 Unix 的系统都以这种方式运行。
Windows: 运行 4 核 2 个进程:
OSX: 运行 8 核 4 个进程:
您会看到两个 OS 平衡了核心之间的进程,虽然不均匀,所以您仍然看到一些核心的百分比高于其他核心。
睡觉
如果你绝对想确定你的进程永远不会吃掉某个核心的 100%(例如,如果你想防止 cpu 风扇上升),那么你可以 运行 在你的处理函数中休眠:
from time import sleep
def f(i):
sleep(0.01)
return math.sqrt(i)
这使得 OS "schedule out" 您的进程每次计算 0.01
秒,并为其他应用程序腾出空间。如果没有其他应用程序,则 cpu 核心处于空闲状态,因此它永远不会达到 100%。您需要尝试不同的睡眠持续时间,它也会因您 运行 使用的计算机而异。如果你想让它变得非常复杂,你可以根据 cpu_times()
报告的内容调整睡眠。
在OS级别
您可以使用nice
为单个命令设置优先级。您也可以使用 nice 启动 python 脚本。 (以下来自:http://blog.scoutapp.com/articles/2014/11/04/restricting-process-cpu-usage-using-nice-cpulimit-and-cgroups)
nice
The nice command tweaks the priority level of a process so that it runs less frequently. This is useful when you need to run a CPU intensive task as a background or batch job. The niceness level ranges from -20 (most favorable scheduling) to 19 (least favorable). Processes on Linux are started with a niceness of 0 by default. The nice command (without any additional parameters) will start a process with a niceness of 10. At that level the scheduler will see it as a lower priority task and give it less CPU resources.Start two matho-primes tasks, one with nice and one without:
nice matho-primes 0 9999999999 > /dev/null &matho-primes 0 9999999999 > /dev/null &
matho-primes 0 9999999999 > /dev/null &
Now run top.
作为 Python
中的函数另一种方法是使用 psutils 检查您过去一分钟的 CPU 平均负载,然后让您的线程检查 CPU 平均负载,如果您低于指定的负载则启动另一个线程CPU 加载目标,如果您高于 CPU 加载目标,则睡眠或终止线程。当您使用计算机时,这不会妨碍您,但会保持恒定的 CPU 负载。
# Import Python modules
import time
import os
import multiprocessing
import psutil
import math
from random import randint
# Main task function
def main_process(item_queue, args_array):
# Go through each link in the array passed in.
while not item_queue.empty():
# Get the next item in the queue
item = item_queue.get()
# Create a random number to simulate threads that
# are not all going to be the same
randomizer = randint(100, 100000)
for i in range(randomizer):
algo_seed = math.sqrt(math.sqrt(i * randomizer) % randomizer)
# Check if the thread should continue based on current load balance
if spool_down_load_balance():
print "Process " + str(os.getpid()) + " saying goodnight..."
break
# This function will build a queue and
def start_thread_process(queue_pile, args_array):
# Create a Queue to hold link pile and share between threads
item_queue = multiprocessing.Queue()
# Put all the initial items into the queue
for item in queue_pile:
item_queue.put(item)
# Append the load balancer thread to the loop
load_balance_process = multiprocessing.Process(target=spool_up_load_balance, args=(item_queue, args_array))
# Loop through and start all processes
load_balance_process.start()
# This .join() function prevents the script from progressing further.
load_balance_process.join()
# Spool down the thread balance when load is too high
def spool_down_load_balance():
# Get the count of CPU cores
core_count = psutil.cpu_count()
# Calulate the short term load average of past minute
one_minute_load_average = os.getloadavg()[0] / core_count
# If load balance above the max return True to kill the process
if one_minute_load_average > args_array['cpu_target']:
print "-Unacceptable load balance detected. Killing process " + str(os.getpid()) + "..."
return True
# Load balancer thread function
def spool_up_load_balance(item_queue, args_array):
print "[Starting load balancer...]"
# Get the count of CPU cores
core_count = psutil.cpu_count()
# While there is still links in queue
while not item_queue.empty():
print "[Calculating load balance...]"
# Check the 1 minute average CPU load balance
# returns 1,5,15 minute load averages
one_minute_load_average = os.getloadavg()[0] / core_count
# If the load average much less than target, start a group of new threads
if one_minute_load_average < args_array['cpu_target'] / 2:
# Print message and log that load balancer is starting another thread
print "Starting another thread group due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
time.sleep(5)
# Start another group of threads
for i in range(3):
start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
start_new_thread.start()
# Allow the added threads to have an impact on the CPU balance
# before checking the one minute average again
time.sleep(20)
# If load average less than target start single thread
elif one_minute_load_average < args_array['cpu_target']:
# Print message and log that load balancer is starting another thread
print "Starting another single thread due to low CPU load balance of: " + str(one_minute_load_average * 100) + "%"
# Start another thread
start_new_thread = multiprocessing.Process(target=main_process,args=(item_queue, args_array))
start_new_thread.start()
# Allow the added threads to have an impact on the CPU balance
# before checking the one minute average again
time.sleep(20)
else:
# Print CPU load balance
print "Reporting stable CPU load balance: " + str(one_minute_load_average * 100) + "%"
# Sleep for another minute while
time.sleep(20)
if __name__=="__main__":
# Set the queue size
queue_size = 10000
# Define an arguments array to pass around all the values
args_array = {
# Set some initial CPU load values as a CPU usage goal
"cpu_target" : 0.60,
# When CPU load is significantly low, start this number
# of threads
"thread_group_size" : 3
}
# Create an array of fixed length to act as queue
queue_pile = list(range(queue_size))
# Set main process start time
start_time = time.time()
# Start the main process
start_thread_process(queue_pile, args_array)
print '[Finished processing the entire queue! Time consuming:{0} Time Finished: {1}]'.format(time.time() - start_time, time.strftime("%c"))
在Linux中:
使用带有数值的 nice():
#on Unix use ps.nice(10) for very low priority
p.nice(10)