使用 Python 进行线程化
Threading with Python
完成python新手...
我正在使用 Arduino pyfirmata 包,我正在尝试做一些非常简单的事情。
根据 python 的用户输入,我希望 LED 闪烁或不闪烁。
我的问题是 python 程序只要求用户输入一次,但我希望它始终要求输入,以便用户可以随时更改功能。
我试过使用线程包但没有成功...也许有更简单的方法,但我对编码完全陌生,所以我不知道任何其他方法。接受建议!!
这是我的代码,
import pyfirmata
import threading
import time
board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
def flash():
for i in range(1000):
board.digital[13].write(1)
time.sleep(1)
board.digital[13].write(0)
time.sleep(1)
def stop():
board.digital[13].write(0)
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
x = threading.Thread(target=flash(), args=(1,))
x.start()
# x.join()
elif runMode == "Stop":
x = threading.Thread(target=stop(), args=(1,))
x.start()
#x.join()
如果你只想杀死线程,你可以使用 mulitiprocessing
multiprocessing.Process 可以 p.terminate()
p = Process(target=flash, args=(,))
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
p.start()
elif runMode == "Stop":
p.terminate()
然而,不建议仅仅杀死线程,因为如果进程正在处理关键资源或依赖于其他线程,它可能会导致错误,请参阅此处以获得更好的解释 Is there any way to kill a Thread?
这里描述的一个更好的选择是使用标志来处理你的闪烁,它们允许线程之间的简单通信
from threading import Event
e = event()
def check_for_stop(e):
while not e.isSet():
flash()
print("Flashing Ended")
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
x = threading.Thread(target=check_for_stop, args=(e,))
x.start()
# x.join()
elif runMode == "Stop":
e.set() #set flag true
e.clear() #reset flag
这里是关于事件对象的更多信息的文档https://docs.python.org/2.0/lib/event-objects.html
我还没有测试过这段代码只是一个例子,如果它不能立即工作,我们深表歉意
编辑:再次查看您的函数,您会想在闪烁期间检查标志,这是我的错误道歉,所以您的闪烁函数看起来像
def flash():
while e.isSet():
board.digital[13].write(1)
time.sleep(1)
board.digital[13].write(0)
time.sleep(1)
并且您可以像以前一样将其传递到线程中
x = threading.Thread(target=flash(), args=(1,))
x.start()
您的代码有误。
您应该通过以下方式创建线程:
x = threading.Thread(target=flash)
注意:你给了输入'flash()'因此在主线程中执行该方法。而且您的方法没有任何参数,因此您可以删除参数值
您可以通过创建自己的 Thread
子class 以面向对象的方式来实现,例如下面的 Flasher
class。
这种方法的优点之一是扩展 Flasher
class 并使其控制连接到不同输出的 LED 或允许闪光之间的延迟相对容易在创建时指定。执行前者将允许多个实例同时 运行。
import pyfirmata
import threading
import time
OFF, ON = False, True
class Flasher(threading.Thread):
DELAY = 1
def __init__(self):
super().__init__()
self.daemon = True
self.board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
self.flashing = False
self.LED_state = OFF
def turn_LED_on(self):
self.board.digital[13].write(1)
self.LED_state = ON
def turn_LED_off(self):
self.board.digital[13].write(0)
self.LED_state = OFF
def run(self):
while True:
if self.flashing:
if self.LED_state == ON:
self.turn_LED_off()
else:
self.turn_LED_on()
time.sleep(self.DELAY)
def start_flashing(self):
if self.LED_state == OFF:
self.turn_LED_on()
self.flashing = True
def stop_flashing(self):
if self.LED_state == ON:
self.turn_LED_off()
self.flashing = False
flasher = Flasher()
flasher.start()
while True:
runMode = input("Run or Stop? ").strip().lower()
if runMode == "run":
flasher.start_flashing()
elif runMode == "stop":
flasher.stop_flashing()
else:
print('Unknown response ignored')
完成python新手... 我正在使用 Arduino pyfirmata 包,我正在尝试做一些非常简单的事情。
根据 python 的用户输入,我希望 LED 闪烁或不闪烁。
我的问题是 python 程序只要求用户输入一次,但我希望它始终要求输入,以便用户可以随时更改功能。
我试过使用线程包但没有成功...也许有更简单的方法,但我对编码完全陌生,所以我不知道任何其他方法。接受建议!!
这是我的代码,
import pyfirmata
import threading
import time
board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
def flash():
for i in range(1000):
board.digital[13].write(1)
time.sleep(1)
board.digital[13].write(0)
time.sleep(1)
def stop():
board.digital[13].write(0)
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
x = threading.Thread(target=flash(), args=(1,))
x.start()
# x.join()
elif runMode == "Stop":
x = threading.Thread(target=stop(), args=(1,))
x.start()
#x.join()
如果你只想杀死线程,你可以使用 mulitiprocessing multiprocessing.Process 可以 p.terminate()
p = Process(target=flash, args=(,))
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
p.start()
elif runMode == "Stop":
p.terminate()
然而,不建议仅仅杀死线程,因为如果进程正在处理关键资源或依赖于其他线程,它可能会导致错误,请参阅此处以获得更好的解释 Is there any way to kill a Thread?
这里描述的一个更好的选择是使用标志来处理你的闪烁,它们允许线程之间的简单通信
from threading import Event
e = event()
def check_for_stop(e):
while not e.isSet():
flash()
print("Flashing Ended")
while True:
runMode = input("Run or Stop? ")
if runMode == "Run":
x = threading.Thread(target=check_for_stop, args=(e,))
x.start()
# x.join()
elif runMode == "Stop":
e.set() #set flag true
e.clear() #reset flag
这里是关于事件对象的更多信息的文档https://docs.python.org/2.0/lib/event-objects.html
我还没有测试过这段代码只是一个例子,如果它不能立即工作,我们深表歉意
编辑:再次查看您的函数,您会想在闪烁期间检查标志,这是我的错误道歉,所以您的闪烁函数看起来像
def flash():
while e.isSet():
board.digital[13].write(1)
time.sleep(1)
board.digital[13].write(0)
time.sleep(1)
并且您可以像以前一样将其传递到线程中
x = threading.Thread(target=flash(), args=(1,))
x.start()
您的代码有误。
您应该通过以下方式创建线程:
x = threading.Thread(target=flash)
注意:你给了输入'flash()'因此在主线程中执行该方法。而且您的方法没有任何参数,因此您可以删除参数值
您可以通过创建自己的 Thread
子class 以面向对象的方式来实现,例如下面的 Flasher
class。
这种方法的优点之一是扩展 Flasher
class 并使其控制连接到不同输出的 LED 或允许闪光之间的延迟相对容易在创建时指定。执行前者将允许多个实例同时 运行。
import pyfirmata
import threading
import time
OFF, ON = False, True
class Flasher(threading.Thread):
DELAY = 1
def __init__(self):
super().__init__()
self.daemon = True
self.board = pyfirmata.Arduino('/dev/cu.usbmodem14101')
self.flashing = False
self.LED_state = OFF
def turn_LED_on(self):
self.board.digital[13].write(1)
self.LED_state = ON
def turn_LED_off(self):
self.board.digital[13].write(0)
self.LED_state = OFF
def run(self):
while True:
if self.flashing:
if self.LED_state == ON:
self.turn_LED_off()
else:
self.turn_LED_on()
time.sleep(self.DELAY)
def start_flashing(self):
if self.LED_state == OFF:
self.turn_LED_on()
self.flashing = True
def stop_flashing(self):
if self.LED_state == ON:
self.turn_LED_off()
self.flashing = False
flasher = Flasher()
flasher.start()
while True:
runMode = input("Run or Stop? ").strip().lower()
if runMode == "run":
flasher.start_flashing()
elif runMode == "stop":
flasher.stop_flashing()
else:
print('Unknown response ignored')