多线程 python 脚本未安全退出
Multithreading python script does not exit safely
我有一个脚本可以启动两个 classes(控制 LED 灯带和 temp/hum 传感器)。
每个 class 运行 都是一个可以用 signal_handler()
终止的 while 循环,它基本上调用 sys.exit(0)
。我正在考虑用 signal_handler()
处理主程序的退出,就像我对 classes 本身所做的那样。
但是,当我尝试 CTRL + C
退出脚本时,程序会错误退出(请参阅下面的代码)并且灯光程序无法正确退出(即,如果退出,灯应该关闭时灯仍然亮着优雅地)。
import threading
from light_controller import LightController
from thermometer import Thermometer
import signal
def signal_handler():
print("\nhouse.py terminated with Ctrl+C.")
if l_thread.is_alive():
l_thread.join()
if t_thread.is_alive():
t_thread.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
lights = LightController()
temp = Thermometer()
t_thread = threading.Thread(target = temp.run)
t_thread.daemon = True
t_thread.start()
l_thread = threading.Thread(target = lights.run)
l_thread.daemon = True
l_thread.start()
Thermometer() terminated with Ctrl+C.
Exception ignored in: <module 'threading' from '/usr/lib/python3.7/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 1281, in _shutdown
t.join()
File "/usr/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
File "/home/pi/Desktop/house/thermometer.py", line 51, in signal_handler
sys.exit(0)
我认为这是因为我在两个 classes 和主程序中复制了 signal_handler()
。两个 classes 都将 运行 无限循环并且可能被它们自己使用,所以我宁愿将 signal_handler()
保留在两个 classes 中。
我不确定是否有可能真正保持这样。我也不知道 sys.exit()
是否真的是在不导致错误的情况下退出的方式。
我可以对主程序使用不同的退出方法 house.py
而不是 CTRL+C.
更新
感谢您的拼写检查!
这是 classes 的代码。
thermometer.py
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106
from luma.core.error import DeviceNotFoundError
import os
import time
import signal
import sys
import socket
from PIL import ImageFont, ImageDraw
# adafruit
import board
import busio
from adafruit_htu21d import HTU21D
class Thermometer(object):
"""docstring for Thermometer"""
def __init__(self):
super(Thermometer, self).__init__()
# TODO: Check for pixelmix.ttf in folder
self.drawfont = "pixelmix.ttf"
self.sleep_secs = 30
try:
signal.signal(signal.SIGINT, self.signal_handler)
self.serial = i2c(port=1, address=0x3C)
self.oled_device = ssd1306(self.serial, rotate=0)
except DeviceNotFoundError:
print("I2C mini OLED display not found.")
sys.exit(1)
try:
# Create library object using our Bus I2C port
#self.i2c_port = busio.I2C(board.SCL, board.SDA)
#self.temp_sensor = HTU21D(self.i2c_port)
print("Running temp in debug mode")
except ValueError:
print("Temperature sensor not found")
sys.exit(1)
def getIP(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
def signal_handler(self, sig, frame):
print("\nThermometer() terminated with Ctrl+C.")
sys.exit(0)
def run(self):
try:
while True:
# Measure things
temp_value = 25
hum_value = 50
#temp_value = round(self.temp_sensor.temperature, 1)
#hum_value = round(self.temp_sensor.relative_humidity, 1)
# Display results
with canvas(self.oled_device) as draw:
draw.rectangle(self.oled_device.bounding_box, outline="white", fill="black")
font = ImageFont.truetype(self.drawfont, 10)
ip = self.getIP()
draw.text((5, 5), "IP: " + ip, fill="white", font=font)
font = ImageFont.truetype(self.drawfont, 12)
draw.text((5, 20), f"T: {temp_value} C", fill="white", font=font)
draw.text((5, 40), f"H: {hum_value}%", fill="white", font=font)
# TODO ADD SAVING Here
time.sleep(self.sleep_secs)
except SystemExit:
print("Exiting...")
sys.exit(0)
except:
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
if __name__ == '__main__':
thermo = Thermometer()
thermo.run()
light_controller.py
import RPi.GPIO as GPIO
import time
import signal
import datetime
import sys
class LightController(object):
"""docstring for LightController"""
def __init__(self):
super(LightController, self).__init__()
signal.signal(signal.SIGTERM, self.safe_exit)
signal.signal(signal.SIGHUP, self.safe_exit)
signal.signal(signal.SIGINT, self.safe_exit)
self.red_pin = 9
self.green_pin = 11
# might be white pin if hooking up a white LED here
self.blue_pin = 10
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.red_pin, GPIO.OUT)
GPIO.setup(self.green_pin, GPIO.OUT)
GPIO.setup(self.blue_pin, GPIO.OUT)
self.pwm_red = GPIO.PWM(self.red_pin, 500) # We need to activate PWM on LED so we can dim, use 1000 Hz
self.pwm_green = GPIO.PWM(self.green_pin, 500)
self.pwm_blue = GPIO.PWM(self.blue_pin, 500)
# Start PWM at 0% duty cycle (off)
self.pwm_red.start(0)
self.pwm_green.start(0)
self.pwm_blue.start(0)
self.pin_zip = zip([self.red_pin, self.green_pin, self.blue_pin],
[self.pwm_red, self.pwm_green, self.pwm_blue])
# Config lights on-off cycle here
self.lights_on = 7
self.lights_off = 19
print(f"Initalizing LightController with lights_on: {self.lights_on}h & lights_off: {self.lights_off}h")
print("------------------------------")
def change_intensity(self, pwm_object, intensity):
pwm_object.ChangeDutyCycle(intensity)
def run(self):
while True:
#for pin, pwm_object in self.pin_zip:
# pwm_object.ChangeDutyCycle(100)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(20)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(0)
current_hour = datetime.datetime.now().hour
# evaluate between
if self.lights_on <= current_hour <= self.lights_off:
self.pwm_blue.ChangeDutyCycle(100)
else:
self.pwm_blue.ChangeDutyCycle(0)
# run this once a second
time.sleep(1)
# ------- Safe Exit ---------- #
def safe_exit(self, signum, frame):
print("\nLightController() terminated with Ctrl+C.")
sys.exit(0)
if __name__ == '__main__':
controller = LightController()
controller.run()
选项 1:穿线很难
为了进一步说明我所说的“无内部循环”——线程很难,所以让我们做点别的。
- 我在此处添加了
__enter__
和 __exit__
到温度计和 LightController class;这使它们可用 as context managers (i.e. with the with
block)。当您拥有“拥有”其他资源的对象时,这很有用;在这种情况下,温度计拥有串行设备,光控制器接触 GPIO。
- 然后,不是每个 class 都有
.run()
,它们将永远停留在那里,让我们让“外部”程序控制:它 运行 永远存在循环,并要求每个“设备”在再次等待之前做它的事情。 (您也可以使用 stdlib sched
module 让 classes 以不同的时间间隔将函数注册到 运行,或者如果不同的 classes 恰好需要不同的检查间隔。)
- 因为没有线程,所以也不需要设置信号处理程序;程序中的 ctrl+c 像常规一样弹出
KeyboardInterrupt
异常,并且 with
块的 __exit__
处理程序有机会清理。
class Thermometer:
def __enter__(self):
self.serial = ...
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: Cleanup the i2c/ssd devices
pass
def step(self):
""" Measure and draw things """
# Measure things...
# Draw things...
class LightController:
def __enter__(self):
GPIO.setmode(...)
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: cleanup GPIO
pass
def step(self):
current_hour = datetime.datetime.now().hour
# etc...
def main():
with LightController() as lights, Thermometer() as temp:
while True:
lights.step()
temp.step()
time.sleep(1)
if __name__ == '__main__':
main()
选项 2:穿线很难,但我们还是做吧
另一种方法是使用 Event
来控制线程的内部循环。
这里的想法是,在循环中代替 time.sleep()
,您让 Event.wait()
进行等待,因为它接受一个可选的超时以等待事件被设置(或不)。事实上,在某些操作系统上,time.sleep()
被实现为让线程等待匿名事件。
当您希望线程退出时,您可以设置停止事件,它们将完成它们正在做的事情。
为了方便起见,我在这里也将这个概念打包成一个“DeviceThread”。
import threading
import time
class DeviceThread(threading.Thread):
interval = 1
def __init__(self, stop_event):
super().__init__(name=self.__class__.__name__)
self.stop_event = stop_event
def step(self):
pass
def initialize(self):
pass
def cleanup(self):
pass
def run(self):
try:
self.initialize()
while not self.stop_event.wait(self.interval):
self.step()
finally:
self.cleanup()
class ThermometerThread(DeviceThread):
def initialize(self):
self.serial = ...
def cleanup(self):
... # close serial port
def step(self):
... # measure and draw
def main():
stop_event = threading.Event()
threads = [ThermometerThread(stop_event)]
for thread in threads:
thread.start()
try:
while True:
# Nothing to do in the main thread...
time.sleep(1)
except KeyboardInterrupt:
print("Caught keyboard interrupt, stopping threads")
stop_event.set()
for thread in threads:
print(f"Waiting for {thread.name} to stop")
thread.join()
我有一个脚本可以启动两个 classes(控制 LED 灯带和 temp/hum 传感器)。
每个 class 运行 都是一个可以用 signal_handler()
终止的 while 循环,它基本上调用 sys.exit(0)
。我正在考虑用 signal_handler()
处理主程序的退出,就像我对 classes 本身所做的那样。
但是,当我尝试 CTRL + C
退出脚本时,程序会错误退出(请参阅下面的代码)并且灯光程序无法正确退出(即,如果退出,灯应该关闭时灯仍然亮着优雅地)。
import threading
from light_controller import LightController
from thermometer import Thermometer
import signal
def signal_handler():
print("\nhouse.py terminated with Ctrl+C.")
if l_thread.is_alive():
l_thread.join()
if t_thread.is_alive():
t_thread.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
lights = LightController()
temp = Thermometer()
t_thread = threading.Thread(target = temp.run)
t_thread.daemon = True
t_thread.start()
l_thread = threading.Thread(target = lights.run)
l_thread.daemon = True
l_thread.start()
Thermometer() terminated with Ctrl+C.
Exception ignored in: <module 'threading' from '/usr/lib/python3.7/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 1281, in _shutdown
t.join()
File "/usr/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
File "/home/pi/Desktop/house/thermometer.py", line 51, in signal_handler
sys.exit(0)
我认为这是因为我在两个 classes 和主程序中复制了 signal_handler()
。两个 classes 都将 运行 无限循环并且可能被它们自己使用,所以我宁愿将 signal_handler()
保留在两个 classes 中。
我不确定是否有可能真正保持这样。我也不知道 sys.exit()
是否真的是在不导致错误的情况下退出的方式。
我可以对主程序使用不同的退出方法 house.py
而不是 CTRL+C.
更新
感谢您的拼写检查!
这是 classes 的代码。
thermometer.py
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106
from luma.core.error import DeviceNotFoundError
import os
import time
import signal
import sys
import socket
from PIL import ImageFont, ImageDraw
# adafruit
import board
import busio
from adafruit_htu21d import HTU21D
class Thermometer(object):
"""docstring for Thermometer"""
def __init__(self):
super(Thermometer, self).__init__()
# TODO: Check for pixelmix.ttf in folder
self.drawfont = "pixelmix.ttf"
self.sleep_secs = 30
try:
signal.signal(signal.SIGINT, self.signal_handler)
self.serial = i2c(port=1, address=0x3C)
self.oled_device = ssd1306(self.serial, rotate=0)
except DeviceNotFoundError:
print("I2C mini OLED display not found.")
sys.exit(1)
try:
# Create library object using our Bus I2C port
#self.i2c_port = busio.I2C(board.SCL, board.SDA)
#self.temp_sensor = HTU21D(self.i2c_port)
print("Running temp in debug mode")
except ValueError:
print("Temperature sensor not found")
sys.exit(1)
def getIP(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
def signal_handler(self, sig, frame):
print("\nThermometer() terminated with Ctrl+C.")
sys.exit(0)
def run(self):
try:
while True:
# Measure things
temp_value = 25
hum_value = 50
#temp_value = round(self.temp_sensor.temperature, 1)
#hum_value = round(self.temp_sensor.relative_humidity, 1)
# Display results
with canvas(self.oled_device) as draw:
draw.rectangle(self.oled_device.bounding_box, outline="white", fill="black")
font = ImageFont.truetype(self.drawfont, 10)
ip = self.getIP()
draw.text((5, 5), "IP: " + ip, fill="white", font=font)
font = ImageFont.truetype(self.drawfont, 12)
draw.text((5, 20), f"T: {temp_value} C", fill="white", font=font)
draw.text((5, 40), f"H: {hum_value}%", fill="white", font=font)
# TODO ADD SAVING Here
time.sleep(self.sleep_secs)
except SystemExit:
print("Exiting...")
sys.exit(0)
except:
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
if __name__ == '__main__':
thermo = Thermometer()
thermo.run()
light_controller.py
import RPi.GPIO as GPIO
import time
import signal
import datetime
import sys
class LightController(object):
"""docstring for LightController"""
def __init__(self):
super(LightController, self).__init__()
signal.signal(signal.SIGTERM, self.safe_exit)
signal.signal(signal.SIGHUP, self.safe_exit)
signal.signal(signal.SIGINT, self.safe_exit)
self.red_pin = 9
self.green_pin = 11
# might be white pin if hooking up a white LED here
self.blue_pin = 10
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.red_pin, GPIO.OUT)
GPIO.setup(self.green_pin, GPIO.OUT)
GPIO.setup(self.blue_pin, GPIO.OUT)
self.pwm_red = GPIO.PWM(self.red_pin, 500) # We need to activate PWM on LED so we can dim, use 1000 Hz
self.pwm_green = GPIO.PWM(self.green_pin, 500)
self.pwm_blue = GPIO.PWM(self.blue_pin, 500)
# Start PWM at 0% duty cycle (off)
self.pwm_red.start(0)
self.pwm_green.start(0)
self.pwm_blue.start(0)
self.pin_zip = zip([self.red_pin, self.green_pin, self.blue_pin],
[self.pwm_red, self.pwm_green, self.pwm_blue])
# Config lights on-off cycle here
self.lights_on = 7
self.lights_off = 19
print(f"Initalizing LightController with lights_on: {self.lights_on}h & lights_off: {self.lights_off}h")
print("------------------------------")
def change_intensity(self, pwm_object, intensity):
pwm_object.ChangeDutyCycle(intensity)
def run(self):
while True:
#for pin, pwm_object in self.pin_zip:
# pwm_object.ChangeDutyCycle(100)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(20)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(0)
current_hour = datetime.datetime.now().hour
# evaluate between
if self.lights_on <= current_hour <= self.lights_off:
self.pwm_blue.ChangeDutyCycle(100)
else:
self.pwm_blue.ChangeDutyCycle(0)
# run this once a second
time.sleep(1)
# ------- Safe Exit ---------- #
def safe_exit(self, signum, frame):
print("\nLightController() terminated with Ctrl+C.")
sys.exit(0)
if __name__ == '__main__':
controller = LightController()
controller.run()
选项 1:穿线很难
为了进一步说明我所说的“无内部循环”——线程很难,所以让我们做点别的。
- 我在此处添加了
__enter__
和__exit__
到温度计和 LightController class;这使它们可用 as context managers (i.e. with thewith
block)。当您拥有“拥有”其他资源的对象时,这很有用;在这种情况下,温度计拥有串行设备,光控制器接触 GPIO。 - 然后,不是每个 class 都有
.run()
,它们将永远停留在那里,让我们让“外部”程序控制:它 运行 永远存在循环,并要求每个“设备”在再次等待之前做它的事情。 (您也可以使用 stdlibsched
module 让 classes 以不同的时间间隔将函数注册到 运行,或者如果不同的 classes 恰好需要不同的检查间隔。) - 因为没有线程,所以也不需要设置信号处理程序;程序中的 ctrl+c 像常规一样弹出
KeyboardInterrupt
异常,并且with
块的__exit__
处理程序有机会清理。
class Thermometer:
def __enter__(self):
self.serial = ...
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: Cleanup the i2c/ssd devices
pass
def step(self):
""" Measure and draw things """
# Measure things...
# Draw things...
class LightController:
def __enter__(self):
GPIO.setmode(...)
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: cleanup GPIO
pass
def step(self):
current_hour = datetime.datetime.now().hour
# etc...
def main():
with LightController() as lights, Thermometer() as temp:
while True:
lights.step()
temp.step()
time.sleep(1)
if __name__ == '__main__':
main()
选项 2:穿线很难,但我们还是做吧
另一种方法是使用 Event
来控制线程的内部循环。
这里的想法是,在循环中代替 time.sleep()
,您让 Event.wait()
进行等待,因为它接受一个可选的超时以等待事件被设置(或不)。事实上,在某些操作系统上,time.sleep()
被实现为让线程等待匿名事件。
当您希望线程退出时,您可以设置停止事件,它们将完成它们正在做的事情。
为了方便起见,我在这里也将这个概念打包成一个“DeviceThread”。
import threading
import time
class DeviceThread(threading.Thread):
interval = 1
def __init__(self, stop_event):
super().__init__(name=self.__class__.__name__)
self.stop_event = stop_event
def step(self):
pass
def initialize(self):
pass
def cleanup(self):
pass
def run(self):
try:
self.initialize()
while not self.stop_event.wait(self.interval):
self.step()
finally:
self.cleanup()
class ThermometerThread(DeviceThread):
def initialize(self):
self.serial = ...
def cleanup(self):
... # close serial port
def step(self):
... # measure and draw
def main():
stop_event = threading.Event()
threads = [ThermometerThread(stop_event)]
for thread in threads:
thread.start()
try:
while True:
# Nothing to do in the main thread...
time.sleep(1)
except KeyboardInterrupt:
print("Caught keyboard interrupt, stopping threads")
stop_event.set()
for thread in threads:
print(f"Waiting for {thread.name} to stop")
thread.join()