使用 Python 进行线程化

Threading with Python

完成python新手... 我正在使用 Arduino pyfirmata 包,我正在尝试做一些非常简单的事情。

根据 python 的用户输入,我希望 LED 闪烁或不闪烁。

我的问题是 python 程序只要求用户输入一次,但我希望它始终要求输入,以便用户可以随时更改功能。

我试过使用线程包但没有成功...也许有更简单的方法,但我对编码完全陌生,所以我不知道任何其他方法。接受建议!!

这是我的代码,

import pyfirmata
import threading
import time

board = pyfirmata.Arduino('/dev/cu.usbmodem14101')

def flash():
    for i in range(1000):
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

def stop():
    board.digital[13].write(0)


while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=flash(), args=(1,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        x = threading.Thread(target=stop(), args=(1,))
        x.start()
        #x.join()

如果你只想杀死线程,你可以使用 mulitiprocessing multiprocessing.Process 可以 p.terminate()

p = Process(target=flash, args=(,))
while True:
    runMode = input("Run or Stop? ")
    if runMode == "Run":
        p.start()
    elif runMode == "Stop":
        p.terminate()

然而,不建议仅仅杀死线程,因为如果进程正在处理关键资源或依赖于其他线程,它可能会导致错误,请参阅此处以获得更好的解释 Is there any way to kill a Thread?

这里描述的一个更好的选择是使用标志来处理你的闪烁,它们允许线程之间的简单通信

from threading import Event

e = event()

def check_for_stop(e):
    while not e.isSet():
         flash()
    print("Flashing Ended")

while True:
    runMode = input("Run or Stop? ")

    if runMode == "Run":
        x = threading.Thread(target=check_for_stop, args=(e,))
        x.start()
        # x.join()

    elif runMode == "Stop":
        e.set() #set flag true
        e.clear() #reset flag

这里是关于事件对象的更多信息的文档https://docs.python.org/2.0/lib/event-objects.html

我还没有测试过这段代码只是一个例子,如果它不能立即工作,我们深表歉意

编辑:再次查看您的函数,您会想在闪烁期间检查标志,这是我的错误道歉,所以您的闪烁函数看起来像

def flash():
    while e.isSet():
        board.digital[13].write(1)
        time.sleep(1)
        board.digital[13].write(0)
        time.sleep(1)

并且您可以像以前一样将其传递到线程中

x = threading.Thread(target=flash(), args=(1,))
x.start()

您的代码有误。

您应该通过以下方式创建线程:

x = threading.Thread(target=flash)

注意:你给了输入'flash()'因此在主线程中执行该方法。而且您的方法没有任何参数,因此您可以删除参数值

您可以通过创建自己的 Thread 子class 以面向对象的方式来实现,例如下面的 Flasher class。

这种方法的优点之一是扩展 Flasher class 并使其控制连接到不同输出的 LED 或允许闪光之间的延迟相对容易在创建时指定。执行前者将允许多个实例同时 运行。

import pyfirmata
import threading
import time

OFF, ON = False, True

class Flasher(threading.Thread):
    DELAY = 1

    def __init__(self):
        super().__init__()
        self.daemon = True
        self.board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
        self.flashing = False
        self.LED_state = OFF

    def turn_LED_on(self):
        self.board.digital[13].write(1)
        self.LED_state = ON

    def turn_LED_off(self):
        self.board.digital[13].write(0)
        self.LED_state = OFF

    def run(self):
        while True:
            if self.flashing:
                if self.LED_state == ON:
                    self.turn_LED_off()
                else:
                    self.turn_LED_on()
            time.sleep(self.DELAY)

    def start_flashing(self):
        if self.LED_state == OFF:
            self.turn_LED_on()
        self.flashing = True

    def stop_flashing(self):
        if self.LED_state == ON:
            self.turn_LED_off()
        self.flashing = False


flasher = Flasher()
flasher.start()

while True:
    runMode = input("Run or Stop? ").strip().lower()

    if runMode == "run":
        flasher.start_flashing()
    elif runMode == "stop":
        flasher.stop_flashing()
    else:
        print('Unknown response ignored')