使用多个线程多次调用一个方法

Calling a Method several times with several Threads

我想要一个 LED 灯闪烁,而我的 Raspberry 正在做一些工作。我在我的 Python 脚本中为 LED 使用线程。

初始代码:

import RPi.GPIO
import time
import threading

pinLED = 10
pinButton = 12

GPIO.setmode(GPIO.BOARD)
GPIO.setup(pinLED, GPIO.OUT)
GPIO.setup(pinButton, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.output(pinLED, 0)

线程方法:

working = False
def flash():
    status = 0
    while working:
        time.sleep(0.5)
        if status == 0:
            GPIO.output(pinLED, 1)
            status = 1
        else:
            GPIO.output(pinLED, 0)
            status = 0|
    GPIO.output(pinLED, 0)

逻辑:

try:
    while True:
        time.sleep(0.02) #found out that 0.02 is a good value to get every tiny button push and save resources

        if GPIO.input(pinButton) == 1:
            t = threading.Thread(target=flash)
            working = True
            t.start()

            time.sleep(5) #work would be here
            working = False
            t.join()

except Exception as e:
    print(e)
finally:
    GPIO.cleanup()

当我启动脚本并第一次按下按钮时,一切正常并且 LED 灯在闪烁。但是当我第二次按下按钮时,没有重新启动脚本,led 没有闪烁。我打印了一些调试消息并发现 t.start() 被调用了,但由于某种原因它什么也没做,也没有抛出异常。每次我再次按下按钮时,LED 不应该开始闪烁吗?

我没有发现 logic 错误,我确认它有效,但使用了以下更改:

  1. if __name__ == '__main__': 内部启动 main_thread
    我建议也将所有 GPIO 调用移动到此 block 中。

避免将启动时执行的代码放在 if __name__ == '__main__':

之外

From the docs: Safe importing of main module One should protect the “entry point” of the program by using if __name__ == '__main__':

  1. working = False 之后添加了 join(),这保证了线程在再次启动之前已经终止。

    working = False  
    t.join()  
    

我建议将 def flash() 更改为以下内容:
使用 threading.Event() 而不是 global Instance 并将其与 pinLED 一起传递。 这概括了 def flash(...) 并允许它与不同的 pinLED 一起使用,甚至是并行的。 将 status 定义为 threading.local() 线程安全,因此不同线程的实例值将不同。
例如:

def flash(pinLED, flashing):
    status = threading.local()
    status.LED = False
    while flashing.is_set():
        status.LED = not status.LED
        GPIO.output(pinLED, int(status.LED))
        time.sleep(0.5)

    GPIO.output(pinLED, 0)

main_thread 的更改:

def main_thread():
    flashing = threading.Event()
    flashing.clear()

    try:
        while True:
            time.sleep(0.02)  
            if GPIO.input(pinButton) == 1:
                t = threading.Thread(target=flash, args=(pinLED, flashing,))
                flashing.set()
                t.start()

                time.sleep(2)  # work would be here

                flashing.clear()
                t.join()
    ...  

测试 Python:3.4.2