在多处理或多线程应用程序中保留 cpu-时间
Reserve cpu-time in multiprocessing or multithreading application
我正在开发一个 Raspberry Pi 3 的项目,用于一些环境控制,其中包含连续循环中的许多简单重复事件。 RP3 胜任这项工作,但它让我可以专注于其他事情。
应用特点:
- 应用程序应从十几个传感器(温度、湿度、pH、ORP 等)收集传感器数据(可变间隔 n 秒)。
- 基于时间和这些传感器数据,控制器计算输出(开关、阀门和 PWM 驱动器)。
- 几乎 none 个事件需要 运行 顺序。
- 有些事件属于 "safety" 类别,应该 运行 立即触发(故障安全传感器、紧急按钮)。
- 大多数事件 运行 以秒为间隔重复(每秒,直到每 30 秒)。
- 一些事件会触发一个动作,在 1 到 120 秒内激活继电器。
- 一些事件使用基于时间的值。这个值需要每天计算几次并且相当 CPU 密集(使用一些迭代插值公式,因此有一个变量 运行time)。
- 显示环境状态(连续循环)
我熟悉(不是专业)VB.NET,但决定在 Python 3.6 中做这个项目。
在过去的几个月里,我阅读了很多关于设计模式、线程、进程、事件、并行处理等主题的内容。
根据我的阅读,我认为 Asyncio 与 Executor 中的一些任务相结合可以完成这项工作。
大多数 tasks/events 都不是时间紧迫的。控制器输出可以使用 'most recent' 传感器数据。
另一方面,某些任务会在特定时间段内激活继电器。我想知道如何对这些任务进行编程,而不会有另一个 'time consuming' 任务在一段时间内(例如)CO2 阀打开时阻塞处理器。这对我的环境来说可能是灾难性的。
因此我需要一些建议。
到目前为止,请参阅下面的代码。我不确定我是否正确使用了 Python 中的 Asyncio 函数。
为了可读性,我会把各个任务的内容放在单独的模块中。
import asyncio
import concurrent.futures
import datetime
import time
import random
import math
# define a task...
async def firstTask():
while True:
await asyncio.sleep(1)
print("First task executed")
# define another task...
async def secondTask():
while True:
await asyncio.sleep(5)
print("Second Worker Executed")
# define/simulate heavy CPU-bound task
def heavy_load():
while True:
print('Heavy_load started')
i = 0
for i in range(50000000):
f = math.sqrt(i)*math.sqrt(i)
print('Heavy_load finished')
time.sleep(4)
def main():
# Create a process pool (for CPU bound tasks).
processpool = concurrent.futures.ProcessPoolExecutor()
# Create a thread pool (for I/O bound tasks).
threadpool = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
try:
# Add all tasks. (Correct use?)
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Loop will be ended")
loop.close()
if __name__ == '__main__':
main()
Most tasks/events are not time-critical. Controller output can use 'most recent' sensordata. Some tasks, on the other hand, activating a relay for a certain period of time. I would like to know how to programm these tasks without the chance another 'time consuming' task is blocking the processor during the period of time (for example) a CO2 valve is open. This could be disastrous for my environment.
请允许我强调 Python 不是实时语言,asyncio 也不是实时组件。它们既没有用于实时执行的基础设施(Python 是垃圾收集的,通常是 运行s 在分时系统上),也没有在这种环境中进行实践测试。因此,我强烈建议不要在任何失误可能对您的环境造成灾难性后果的情况下使用它们。
除此之外,您的代码有问题:虽然 heavy_load
计算不会阻止事件循环,但它永远不会完成,也不会提供有关其进度的信息。 run_in_executor
背后的想法是,您正在 运行ning 的计算最终会停止,并且事件循环会希望收到有关它的通知。 run_in_executor
的惯用用法可能如下所示:
def do_heavy_calc(param):
print('Heavy_load started')
f = 0
for i in range(50000000):
f += math.sqrt(i)*math.sqrt(i)
return f
def heavy_calc(param):
loop = asyncio.get_event_loop()
return loop.run_in_executor(processpool, do_heavy_calc)
表达式heavy_calc(...)
不仅运行不阻塞事件循环,而且可等待。这意味着异步代码可以等待其结果,也不会阻塞其他协程:
async def sum_params(p1, p2):
s1 = await heavy_calc(p1)
s2 = await heavy_calc(p2)
return s1 + s2
以上运行两次计算相继进行。也可以并行完成:
async def sum_params_parallel(p1, p2):
s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
return s1 + s2
另一件可以改进的事情是设置代码:
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
调用 asyncio.ensure_future
然后从不等待结果有点像 asyncio 反模式。未等待的任务引发的异常会被默默地吞掉,这几乎肯定不是您想要的。有时人们只是忘记写 await
,这就是为什么 asyncio 在循环被破坏时抱怨未等待的挂起任务。
安排每个任务由某人等待是一种很好的编码习惯,可以立即与await
或gather
结合使用任务,或稍后。例如,如果任务需要在后台 运行,您可以将其存储在某处并 await
或在应用程序生命周期结束时将其取消。在你的情况下,我会结合 gather
与 loop.run_until_complete
:
everything = asyncio.gather(firstTask(), secondTask(),
loop.run_in_executor(processpool, heavy_load))
loop.run_until_complete(everything)
Some events are in the category "safety" and should run instantly when triggered (fail-safe sensors, emergency button).
那我强烈建议你不要依赖软件来完成这个功能。切断电源的紧急停止按钮是通常执行此类操作的方式。如果您有软件可以做到这一点,并且您确实有生命威胁的情况需要处理,那么您将陷入一大堆灾难 - 几乎可以肯定,您必须遵守大量法规。
我正在开发一个 Raspberry Pi 3 的项目,用于一些环境控制,其中包含连续循环中的许多简单重复事件。 RP3 胜任这项工作,但它让我可以专注于其他事情。
应用特点:
- 应用程序应从十几个传感器(温度、湿度、pH、ORP 等)收集传感器数据(可变间隔 n 秒)。
- 基于时间和这些传感器数据,控制器计算输出(开关、阀门和 PWM 驱动器)。
- 几乎 none 个事件需要 运行 顺序。
- 有些事件属于 "safety" 类别,应该 运行 立即触发(故障安全传感器、紧急按钮)。
- 大多数事件 运行 以秒为间隔重复(每秒,直到每 30 秒)。
- 一些事件会触发一个动作,在 1 到 120 秒内激活继电器。
- 一些事件使用基于时间的值。这个值需要每天计算几次并且相当 CPU 密集(使用一些迭代插值公式,因此有一个变量 运行time)。
- 显示环境状态(连续循环)
我熟悉(不是专业)VB.NET,但决定在 Python 3.6 中做这个项目。 在过去的几个月里,我阅读了很多关于设计模式、线程、进程、事件、并行处理等主题的内容。
根据我的阅读,我认为 Asyncio 与 Executor 中的一些任务相结合可以完成这项工作。
大多数 tasks/events 都不是时间紧迫的。控制器输出可以使用 'most recent' 传感器数据。 另一方面,某些任务会在特定时间段内激活继电器。我想知道如何对这些任务进行编程,而不会有另一个 'time consuming' 任务在一段时间内(例如)CO2 阀打开时阻塞处理器。这对我的环境来说可能是灾难性的。
因此我需要一些建议。
到目前为止,请参阅下面的代码。我不确定我是否正确使用了 Python 中的 Asyncio 函数。 为了可读性,我会把各个任务的内容放在单独的模块中。
import asyncio
import concurrent.futures
import datetime
import time
import random
import math
# define a task...
async def firstTask():
while True:
await asyncio.sleep(1)
print("First task executed")
# define another task...
async def secondTask():
while True:
await asyncio.sleep(5)
print("Second Worker Executed")
# define/simulate heavy CPU-bound task
def heavy_load():
while True:
print('Heavy_load started')
i = 0
for i in range(50000000):
f = math.sqrt(i)*math.sqrt(i)
print('Heavy_load finished')
time.sleep(4)
def main():
# Create a process pool (for CPU bound tasks).
processpool = concurrent.futures.ProcessPoolExecutor()
# Create a thread pool (for I/O bound tasks).
threadpool = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
try:
# Add all tasks. (Correct use?)
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Loop will be ended")
loop.close()
if __name__ == '__main__':
main()
Most tasks/events are not time-critical. Controller output can use 'most recent' sensordata. Some tasks, on the other hand, activating a relay for a certain period of time. I would like to know how to programm these tasks without the chance another 'time consuming' task is blocking the processor during the period of time (for example) a CO2 valve is open. This could be disastrous for my environment.
请允许我强调 Python 不是实时语言,asyncio 也不是实时组件。它们既没有用于实时执行的基础设施(Python 是垃圾收集的,通常是 运行s 在分时系统上),也没有在这种环境中进行实践测试。因此,我强烈建议不要在任何失误可能对您的环境造成灾难性后果的情况下使用它们。
除此之外,您的代码有问题:虽然 heavy_load
计算不会阻止事件循环,但它永远不会完成,也不会提供有关其进度的信息。 run_in_executor
背后的想法是,您正在 运行ning 的计算最终会停止,并且事件循环会希望收到有关它的通知。 run_in_executor
的惯用用法可能如下所示:
def do_heavy_calc(param):
print('Heavy_load started')
f = 0
for i in range(50000000):
f += math.sqrt(i)*math.sqrt(i)
return f
def heavy_calc(param):
loop = asyncio.get_event_loop()
return loop.run_in_executor(processpool, do_heavy_calc)
表达式heavy_calc(...)
不仅运行不阻塞事件循环,而且可等待。这意味着异步代码可以等待其结果,也不会阻塞其他协程:
async def sum_params(p1, p2):
s1 = await heavy_calc(p1)
s2 = await heavy_calc(p2)
return s1 + s2
以上运行两次计算相继进行。也可以并行完成:
async def sum_params_parallel(p1, p2):
s1, s2 = await asyncio.gather(heavy_calc(p1), heavy_calc(p2))
return s1 + s2
另一件可以改进的事情是设置代码:
asyncio.ensure_future(firstTask())
asyncio.ensure_future(secondTask())
loop.run_in_executor(processpool, heavy_load)
loop.run_forever()
调用 asyncio.ensure_future
然后从不等待结果有点像 asyncio 反模式。未等待的任务引发的异常会被默默地吞掉,这几乎肯定不是您想要的。有时人们只是忘记写 await
,这就是为什么 asyncio 在循环被破坏时抱怨未等待的挂起任务。
安排每个任务由某人等待是一种很好的编码习惯,可以立即与await
或gather
结合使用任务,或稍后。例如,如果任务需要在后台 运行,您可以将其存储在某处并 await
或在应用程序生命周期结束时将其取消。在你的情况下,我会结合 gather
与 loop.run_until_complete
:
everything = asyncio.gather(firstTask(), secondTask(),
loop.run_in_executor(processpool, heavy_load))
loop.run_until_complete(everything)
Some events are in the category "safety" and should run instantly when triggered (fail-safe sensors, emergency button).
那我强烈建议你不要依赖软件来完成这个功能。切断电源的紧急停止按钮是通常执行此类操作的方式。如果您有软件可以做到这一点,并且您确实有生命威胁的情况需要处理,那么您将陷入一大堆灾难 - 几乎可以肯定,您必须遵守大量法规。