Python 线程 class 用于 GPIO Led 闪烁
Python threading class for GPIO Led blink
不幸的是,我在全方位尝试解决这个问题后没有任何结果后来到这里。
我的想法:
我需要一个名为 Led 的 class,它在构造函数中只需接受一个 GPIO 引脚并提供以下方法:
- 亮起
- 关灯
- 闪烁
我是做什么的:
我是这样构建的class:
import RPi.GPIO as GPIO
import time
import threading
from threading import Thread
class Led(Thread):
def __init__(self, led_pin):
Thread.__init__(self)
self.pin_stop = threading.Event()
self.__led_pin = led_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
def low(self, pin):
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
def blink(self, time_on=0.050, time_off=1):
pin = threading.Thread(name='ledblink',target=self.__blink_pin, args=(time_on, time_off, self.pin_stop))
pin.start()
def __blink_pin(self, time_on, time_off, pin_stop):
while not pin_stop.is_set():
GPIO.output(self.__led_pin, GPIO.HIGH)
time.sleep(time_on)
GPIO.output(self.__led_pin, GPIO.LOW)
time.sleep(time_off)
def __stop(self):
self.pin_stop.set()
def reset(self):
GPIO.cleanup()
def off(self):
self.__stop()
def on(self):
self.__stop()
GPIO.output(self.__led_pin, GPIO.LOW)
GPIO.output(self.__led_pin, GPIO.HIGH)
闪烁方法负责无限期闪烁 LED,直到调用 Off 或 On 方法。
和运行这个简单的代码:
from classes.leds import Led
import time
from random import randint
Led16 = Led(16)
def main():
while True:
if (randint(0, 1) == 1):
Led16.blink()
else:
Led16.off()
time.sleep(2)
if __name__ == "__main__":
main()
发生了什么:
每次调用一个方法时,Led 对象似乎都会产生一个新线程,从而使 GPIO 线在多个线程之间共享。
我的愿望是什么:
我想保持异步闪烁 LED(显然)并控制 Led16() 对象状态,也许每次我调用它的方法时都不需要创建新线程,但在达到这一点时我有点困惑。
感谢帮助我了解如何实现这个目标。
您正在创建大量线程,因为您的 blink()
每次调用都会创建一个新线程,而旧线程不会停止。
我想线程有几个选项:
只创建一次线程 - 例如在 __init__()
- 它会在闪烁的时间间隔内连续运行(即大部分时间都在睡觉)读取实例变量并设置 LED相应地。要更改 LED 状态,blink()
、on()
和 off()
通过将此实例变量设置为 on/off/blinking.
来控制 LED
在闪烁例程中,如果线程已经运行要么不创建新线程,要么停止旧线程(并等待它完成)然后开始一个新的。
你必须处理的事情是你希望行为是这样的:
- 如果 LED 灯熄灭,那么一旦
on()
或 blink()
被调用 LED 灯就会亮起
- 如果 LED 正在闪烁并且
blink()
再次被调用,则闪烁 on/off 序列不会受到干扰
- 如果闪烁并调用
off()
如果它已经开始 运行 完成,我希望开启循环,即 LED 不应立即关闭,因为这可能会很短看起来很奇怪的闪光灯。
创建新线程的问题是等待旧线程完成,在 __init__()
中仅创建一次线程并持续 运行 感觉最简单。当 LED 打开或关闭时,时间段会缩短(到值 FAST_CYCLE
),以便当 LED 关闭或打开时它会快速反应,因为 sleep()
时间很短。
关于您的代码的其他几点:
- 我认为您不需要让 class 继承自
Thread
- 您正在 pin=...
行中创建一个新线程。
- 如果您在编写代码时添加注释,通常会使阅读代码时更容易理解发生了什么。
- 如果您保留对线程的引用(即
self.pin = threading.Thread
而不是 pin = threading.Thread
),那么在 reset()
中您可以使用 join()
来确保它之前已经退出你继续
- 由于闪烁时间段可能会改变并且线程必须使用最新的值,所以每次都使用 self 来读取它们而不是将它们作为参数传递给
__blink_pin()
例程,如果这样做你可能也可以使用 self 来获取 pin_stop 信号量。
像这样(未经测试):
import RPi.GPIO as GPIO
import time
import threading
from threading import Thread
class Led(object):
LED_OFF = 0
LED_ON = 1
LED_FLASHING = 2
# the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
FAST_CYCLE = 0.05
def __init__(self, led_pin):
# create the semaphore used to make thread exit
self.pin_stop = threading.Event()
# the pin for the LED
self.__led_pin = led_pin
# initialise the pin and turn the led off
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
# the mode for the led - off/on/flashing
self.__ledmode = Led.LED_OFF
# make sure the LED is off (this also initialises the times for the thread)
self.off()
# create the thread, keep a reference to it for when we need to exit
self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
# start the thread
self.__thread.start()
def blink(self, time_on=0.050, time_off=1):
# blinking will start at the next first period
# because turning the led on now might look funny because we don't know
# when the next first period will start - the blink routine does all the
# timing so that will 'just work'
self.__ledmode = Led.LED_FLASHING
self.__time_on = time_on
self.__time_off = time_off
def off(self):
self.__ledmode = LED_OFF
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED off immediately, might make for a short flicker on if was blinking
def on(self):
self.__ledmode = LED_ON
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED on immediately, might make for a short flicker off if was blinking
def reset(self):
# set the semaphore so the thread will exit after sleep has completed
self.pin_stop.set()
# wait for the thread to exit
self.__thread.join()
# now clean up the GPIO
GPIO.cleanup()
############################################################################
# below here are private methods
def __turnledon(self, pin):
GPIO.output(pin, GPIO.LOW)
def __turnledoff(self, pin):
GPIO.output(pin, GPIO.HIGH)
# this does all the work
# If blinking, there are two sleeps in each loop
# if on or off, there is only one sleep to ensure quick response to blink()
def __blink_pin(self):
while not self.pin_stop.is_set():
# the first period is when the LED will be on if blinking
if self.__ledmode == Led.LED_ON or self.__ledmode == Led.LED_FLASHING:
self.__turnledon()
else:
self.__turnledoff()
# this is the first sleep - the 'on' time when blinking
time.sleep(self.__time_on)
# only if blinking, turn led off and do a second sleep for the off time
if self.__ledmode == Led.LED_FLASHING:
self.__turnledoff()
# do an extra check that the stop semaphore hasn't been set before the off-time sleep
if not self.pin_stop.is_set():
# this is the second sleep - off time when blinking
time.sleep(self.__time_off)
对于将来需要它的人,我对建议的代码做了一些小的调整,并且似乎按预期工作得很好。
再次感谢@barny
import RPi.GPIO as GPIO
import time
import threading
class Led(object):
LED_OFF = 0
LED_ON = 1
LED_FLASHING = 2
# the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
FAST_CYCLE = 0.05
def __init__(self, led_pin):
# create the semaphore used to make thread exit
self.pin_stop = threading.Event()
# the pin for the LED
self.__led_pin = led_pin
# initialise the pin and turn the led off
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
# the mode for the led - off/on/flashing
self.__ledmode = Led.LED_OFF
# make sure the LED is off (this also initialises the times for the thread)
self.off()
# create the thread, keep a reference to it for when we need to exit
self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
# start the thread
self.__thread.start()
def blink(self, time_on=0.050, time_off=1):
# blinking will start at the next first period
# because turning the led on now might look funny because we don't know
# when the next first period will start - the blink routine does all the
# timing so that will 'just work'
self.__ledmode = Led.LED_FLASHING
self.__time_on = time_on
self.__time_off = time_off
def off(self):
self.__ledmode = self.LED_OFF
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED off immediately, might make for a short flicker on if was blinking
def on(self):
self.__ledmode = self.LED_ON
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED on immediately, might make for a short flicker off if was blinking
def reset(self):
# set the semaphore so the thread will exit after sleep has completed
self.pin_stop.set()
# wait for the thread to exit
self.__thread.join()
# now clean up the GPIO
GPIO.cleanup()
我不确定它是否有用,但我想到了(使用 gpiozero
)
from gpiozero import LED
import time
import threading
class LEDplus():
def __init__(self,pinnumber):
self.led = LED(pinnumber)
self.__loop = True
self.__threading = threading.Thread(target=self.__blink)
def on(self,):
self.__loop = False
self.maybejoin()
self.led.on()
def off(self, ):
self.__loop = False
self.maybejoin()
self.led.off()
def maybejoin(self,):
if self.__threading.isAlive():
self.__threading.join()
def blink(self, pitch):
self.__threading = threading.Thread(target=self.__blink, args=(pitch, ))
self.__threading.start()
def __blink(self, pitch=.25):
self.__loop = True
while self.__loop:
self.led.toggle()
time.sleep(pitch/2)
self.led.off()
green = LEDplus(18)
green.blink(1)
在 上,只是缺少 class 的私有方法。在下面添加了一些修复。 __turnledon()
不需要参数 - 它可以访问已经存储在初始化中的 self.__led_pin
。
############################################################################
# below here are private methods
def __turnledon(self):
GPIO.output(self.__led_pin, GPIO.LOW)
def __turnledoff(self):
GPIO.output(self.__led_pin , GPIO.HIGH)
# this does all the work
# If blinking, there are two sleeps in each loop
# if on or off, there is only one sleep to ensure quick response to blink()
def __blink_pin(self):
while not self.pin_stop.is_set():
# the first period is when the LED will be on if blinking
if self.__ledmode == BlinkerLed.LED_ON or self.__ledmode == BlinkerLed.LED_FLASHING:
self.__turnledon()
else:
self.__turnledoff()
# this is the first sleep - the 'on' time when blinking
time.sleep(self.__time_on)
# only if blinking, turn led off and do a second sleep for the off time
if self.__ledmode == BlinkerLed.LED_FLASHING:
self.__turnledoff()
# do an extra check that the stop semaphore hasn't been set before the off-time sleep
if not self.pin_stop.is_set():
# this is the second sleep - off time when blinking
time.sleep(self.__time_off)
不幸的是,我在全方位尝试解决这个问题后没有任何结果后来到这里。
我的想法:
我需要一个名为 Led 的 class,它在构造函数中只需接受一个 GPIO 引脚并提供以下方法:
- 亮起
- 关灯
- 闪烁
我是做什么的:
我是这样构建的class:
import RPi.GPIO as GPIO
import time
import threading
from threading import Thread
class Led(Thread):
def __init__(self, led_pin):
Thread.__init__(self)
self.pin_stop = threading.Event()
self.__led_pin = led_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
def low(self, pin):
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.LOW)
def blink(self, time_on=0.050, time_off=1):
pin = threading.Thread(name='ledblink',target=self.__blink_pin, args=(time_on, time_off, self.pin_stop))
pin.start()
def __blink_pin(self, time_on, time_off, pin_stop):
while not pin_stop.is_set():
GPIO.output(self.__led_pin, GPIO.HIGH)
time.sleep(time_on)
GPIO.output(self.__led_pin, GPIO.LOW)
time.sleep(time_off)
def __stop(self):
self.pin_stop.set()
def reset(self):
GPIO.cleanup()
def off(self):
self.__stop()
def on(self):
self.__stop()
GPIO.output(self.__led_pin, GPIO.LOW)
GPIO.output(self.__led_pin, GPIO.HIGH)
闪烁方法负责无限期闪烁 LED,直到调用 Off 或 On 方法。
和运行这个简单的代码:
from classes.leds import Led
import time
from random import randint
Led16 = Led(16)
def main():
while True:
if (randint(0, 1) == 1):
Led16.blink()
else:
Led16.off()
time.sleep(2)
if __name__ == "__main__":
main()
发生了什么:
每次调用一个方法时,Led 对象似乎都会产生一个新线程,从而使 GPIO 线在多个线程之间共享。
我的愿望是什么:
我想保持异步闪烁 LED(显然)并控制 Led16() 对象状态,也许每次我调用它的方法时都不需要创建新线程,但在达到这一点时我有点困惑。
感谢帮助我了解如何实现这个目标。
您正在创建大量线程,因为您的 blink()
每次调用都会创建一个新线程,而旧线程不会停止。
我想线程有几个选项:
只创建一次线程 - 例如在
__init__()
- 它会在闪烁的时间间隔内连续运行(即大部分时间都在睡觉)读取实例变量并设置 LED相应地。要更改 LED 状态,blink()
、on()
和off()
通过将此实例变量设置为 on/off/blinking. 来控制 LED
在闪烁例程中,如果线程已经运行要么不创建新线程,要么停止旧线程(并等待它完成)然后开始一个新的。
你必须处理的事情是你希望行为是这样的:
- 如果 LED 灯熄灭,那么一旦
on()
或blink()
被调用 LED 灯就会亮起 - 如果 LED 正在闪烁并且
blink()
再次被调用,则闪烁 on/off 序列不会受到干扰 - 如果闪烁并调用
off()
如果它已经开始 运行 完成,我希望开启循环,即 LED 不应立即关闭,因为这可能会很短看起来很奇怪的闪光灯。
创建新线程的问题是等待旧线程完成,在 __init__()
中仅创建一次线程并持续 运行 感觉最简单。当 LED 打开或关闭时,时间段会缩短(到值 FAST_CYCLE
),以便当 LED 关闭或打开时它会快速反应,因为 sleep()
时间很短。
关于您的代码的其他几点:
- 我认为您不需要让 class 继承自
Thread
- 您正在pin=...
行中创建一个新线程。 - 如果您在编写代码时添加注释,通常会使阅读代码时更容易理解发生了什么。
- 如果您保留对线程的引用(即
self.pin = threading.Thread
而不是pin = threading.Thread
),那么在reset()
中您可以使用join()
来确保它之前已经退出你继续 - 由于闪烁时间段可能会改变并且线程必须使用最新的值,所以每次都使用 self 来读取它们而不是将它们作为参数传递给
__blink_pin()
例程,如果这样做你可能也可以使用 self 来获取 pin_stop 信号量。
像这样(未经测试):
import RPi.GPIO as GPIO
import time
import threading
from threading import Thread
class Led(object):
LED_OFF = 0
LED_ON = 1
LED_FLASHING = 2
# the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
FAST_CYCLE = 0.05
def __init__(self, led_pin):
# create the semaphore used to make thread exit
self.pin_stop = threading.Event()
# the pin for the LED
self.__led_pin = led_pin
# initialise the pin and turn the led off
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
# the mode for the led - off/on/flashing
self.__ledmode = Led.LED_OFF
# make sure the LED is off (this also initialises the times for the thread)
self.off()
# create the thread, keep a reference to it for when we need to exit
self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
# start the thread
self.__thread.start()
def blink(self, time_on=0.050, time_off=1):
# blinking will start at the next first period
# because turning the led on now might look funny because we don't know
# when the next first period will start - the blink routine does all the
# timing so that will 'just work'
self.__ledmode = Led.LED_FLASHING
self.__time_on = time_on
self.__time_off = time_off
def off(self):
self.__ledmode = LED_OFF
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED off immediately, might make for a short flicker on if was blinking
def on(self):
self.__ledmode = LED_ON
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED on immediately, might make for a short flicker off if was blinking
def reset(self):
# set the semaphore so the thread will exit after sleep has completed
self.pin_stop.set()
# wait for the thread to exit
self.__thread.join()
# now clean up the GPIO
GPIO.cleanup()
############################################################################
# below here are private methods
def __turnledon(self, pin):
GPIO.output(pin, GPIO.LOW)
def __turnledoff(self, pin):
GPIO.output(pin, GPIO.HIGH)
# this does all the work
# If blinking, there are two sleeps in each loop
# if on or off, there is only one sleep to ensure quick response to blink()
def __blink_pin(self):
while not self.pin_stop.is_set():
# the first period is when the LED will be on if blinking
if self.__ledmode == Led.LED_ON or self.__ledmode == Led.LED_FLASHING:
self.__turnledon()
else:
self.__turnledoff()
# this is the first sleep - the 'on' time when blinking
time.sleep(self.__time_on)
# only if blinking, turn led off and do a second sleep for the off time
if self.__ledmode == Led.LED_FLASHING:
self.__turnledoff()
# do an extra check that the stop semaphore hasn't been set before the off-time sleep
if not self.pin_stop.is_set():
# this is the second sleep - off time when blinking
time.sleep(self.__time_off)
对于将来需要它的人,我对建议的代码做了一些小的调整,并且似乎按预期工作得很好。
再次感谢@barny
import RPi.GPIO as GPIO
import time
import threading
class Led(object):
LED_OFF = 0
LED_ON = 1
LED_FLASHING = 2
# the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
FAST_CYCLE = 0.05
def __init__(self, led_pin):
# create the semaphore used to make thread exit
self.pin_stop = threading.Event()
# the pin for the LED
self.__led_pin = led_pin
# initialise the pin and turn the led off
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.__led_pin, GPIO.OUT)
# the mode for the led - off/on/flashing
self.__ledmode = Led.LED_OFF
# make sure the LED is off (this also initialises the times for the thread)
self.off()
# create the thread, keep a reference to it for when we need to exit
self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
# start the thread
self.__thread.start()
def blink(self, time_on=0.050, time_off=1):
# blinking will start at the next first period
# because turning the led on now might look funny because we don't know
# when the next first period will start - the blink routine does all the
# timing so that will 'just work'
self.__ledmode = Led.LED_FLASHING
self.__time_on = time_on
self.__time_off = time_off
def off(self):
self.__ledmode = self.LED_OFF
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED off immediately, might make for a short flicker on if was blinking
def on(self):
self.__ledmode = self.LED_ON
# set the cycle times short so changes to ledmode are picked up quickly
self.__time_on = Led.FAST_CYCLE
self.__time_off = Led.FAST_CYCLE
# could turn the LED on immediately, might make for a short flicker off if was blinking
def reset(self):
# set the semaphore so the thread will exit after sleep has completed
self.pin_stop.set()
# wait for the thread to exit
self.__thread.join()
# now clean up the GPIO
GPIO.cleanup()
我不确定它是否有用,但我想到了(使用 gpiozero
)
from gpiozero import LED
import time
import threading
class LEDplus():
def __init__(self,pinnumber):
self.led = LED(pinnumber)
self.__loop = True
self.__threading = threading.Thread(target=self.__blink)
def on(self,):
self.__loop = False
self.maybejoin()
self.led.on()
def off(self, ):
self.__loop = False
self.maybejoin()
self.led.off()
def maybejoin(self,):
if self.__threading.isAlive():
self.__threading.join()
def blink(self, pitch):
self.__threading = threading.Thread(target=self.__blink, args=(pitch, ))
self.__threading.start()
def __blink(self, pitch=.25):
self.__loop = True
while self.__loop:
self.led.toggle()
time.sleep(pitch/2)
self.led.off()
green = LEDplus(18)
green.blink(1)
在 __turnledon()
不需要参数 - 它可以访问已经存储在初始化中的 self.__led_pin
。
############################################################################
# below here are private methods
def __turnledon(self):
GPIO.output(self.__led_pin, GPIO.LOW)
def __turnledoff(self):
GPIO.output(self.__led_pin , GPIO.HIGH)
# this does all the work
# If blinking, there are two sleeps in each loop
# if on or off, there is only one sleep to ensure quick response to blink()
def __blink_pin(self):
while not self.pin_stop.is_set():
# the first period is when the LED will be on if blinking
if self.__ledmode == BlinkerLed.LED_ON or self.__ledmode == BlinkerLed.LED_FLASHING:
self.__turnledon()
else:
self.__turnledoff()
# this is the first sleep - the 'on' time when blinking
time.sleep(self.__time_on)
# only if blinking, turn led off and do a second sleep for the off time
if self.__ledmode == BlinkerLed.LED_FLASHING:
self.__turnledoff()
# do an extra check that the stop semaphore hasn't been set before the off-time sleep
if not self.pin_stop.is_set():
# this is the second sleep - off time when blinking
time.sleep(self.__time_off)