在 Python 线程化期间更新 Tkinter 标签
Update Tkinter label during threading in Python
我目前正在尝试对由 Raspberry Pi 控制的机器人手臂进行编程。
到目前为止一切正常,除了一件事,我已经用谷歌搜索并尝试了很多小时但找不到有效的解决方案。
为了机器人手臂的运动,需要 运行 所有电机 "simultaneously" 带螺纹(工作正常)。
我遇到的问题是,我需要更新一个标签,该标签在完成运动后立即显示轴(电机)的当前角度,但其他电机仍在 运行ning(线程)。
经过大量研究,我认为我通过使用队列和 Tkinters 后处理方法找到了解决方案。但它仍然不起作用,因为标签文本仅在所有线程终止后才会更新。
我写了一个示例代码,我想在其中获取电机 "one" 的标签更新,它将在电机 "two"(500 次迭代)之前完成其 for 循环(100 次迭代)。我希望标签会在第一个电机达到目标时立即更新,而第二个电机仍在 运行ing。
但是虽然我用了after-method,但还是要等到motor 2完成再更新label。
希望你能帮助我!
from tkinter import *
import threading
import time
from queue import *
class StepperMotors:
def __init__(self, root):
self.root = root
self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
self.start_btn.config(width = 10)
self.start_btn.grid(row=1,column=1)
self.label_one = Label(root, text='')
self.label_one.config(width = 10)
self.label_one.grid(row=2, column=1)
self.label_two = Label(root, text='')
self.label_two.config(width = 10)
self.label_two.grid(row=3, column=1)
def start_movement(self):
self.thread_queue = Queue()
self.root.after(100, self.wait_for_finish)
thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
def motor_actuation(self, motor, iterations):
for i in range(iterations):
i = i+1
update_text = str(motor) + " " + str(i) + "\n"
print(update_text)
time.sleep(0.01)
self.thread_queue.put(update_text)
def wait_for_finish(self):
try:
self.text = self.thread_queue.get()
self.label_one.config(text=self.text)
except self.thread_queue.empty():
self.root.after(100, self.wait_for_finish)
if __name__ == "__main__":
root = Tk()
root.title("test")
stepper = StepperMotors(root)
root.mainloop()
最好使用非阻塞的守护线程。
此外,最好有一个separation of concerns:机器人(或机械臂)可以是一个有自己生命周期的对象:守护线程。同上,您可以定义一个 "LabelUpdater" 来读取机器人的状态并更新标签。
让我们定义一个机器人:
- 它是在应用程序初始化时创建的,当用户单击 "Start" 按钮时是 运行,
- 机器人在应用级多线程队列中移动并报告其角度,
class Robot(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, end_pos: int):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.end_pos = end_pos
def run(self) -> None:
for angle in range(self.end_pos):
self.label_queue.put(angle)
time.sleep(0.01)
让我们定义一个 LabelUpdater:
- 它是在应用程序初始化时创建的,并且永远 运行(它可以观察机器人,即使它不是 运行ning)。
- 它读取机器人队列(例如每秒一次以避免闪烁)并更新标签
class LabelUpdater(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, root_app: tkinter.Tk, variable: tkinter.Variable):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.root_app = root_app
self.variable = variable
def run(self) -> None:
# run forever
while True:
# wait a second please
time.sleep(1)
# consume all the queue and keep only the last message
last_msg = None
while True:
try:
msg = self.label_queue.get(block=False)
except queue.Empty:
break
last_msg = msg
self.label_queue.task_done()
if last_msg:
self.variable.set(last_msg)
那么,主应用程序应该定义:
- 2 个多线程队列:每个标签一个,
- 2
tkinter.StringVar
将更新的变量,
- 机器人和标签更新器,
- 两个更新程序已启动,并将运行永远。
class StepperMotors:
def __init__(self, root):
self.root = root
self.label_one_queue = queue.Queue()
self.label_two_queue = queue.Queue()
self.start_btn = tkinter.Button(root, text="Start", command=lambda: self.start_movement())
self.start_btn.config(width=10)
self.start_btn.grid(row=1, column=1)
self.text_one = tkinter.StringVar()
self.text_one.set("one")
self.label_one = tkinter.Label(root, textvariable=self.text_one)
self.label_one.config(width=10)
self.label_one.grid(row=2, column=1)
self.text_two = tkinter.StringVar()
self.text_two.set("two")
self.label_two = tkinter.Label(root, textvariable=self.text_two)
self.label_two.config(width=10)
self.label_two.grid(row=3, column=1)
self.robot_one = Robot("robot_one", self.label_one_queue, 100)
self.robot_two = Robot("robot_two", self.label_two_queue, 500)
self.updater_one = LabelUpdater("updater_one", self.label_one_queue, self.root, self.text_one)
self.updater_two = LabelUpdater("updater_two", self.label_two_queue, self.root, self.text_two)
self.updater_one.start()
self.updater_two.start()
def start_movement(self):
self.robot_one.start()
self.robot_two.start()
当然,您需要一个标志或其他东西来检查每个机器人是否已经 运行ning。
我目前正在尝试对由 Raspberry Pi 控制的机器人手臂进行编程。
到目前为止一切正常,除了一件事,我已经用谷歌搜索并尝试了很多小时但找不到有效的解决方案。
为了机器人手臂的运动,需要 运行 所有电机 "simultaneously" 带螺纹(工作正常)。
我遇到的问题是,我需要更新一个标签,该标签在完成运动后立即显示轴(电机)的当前角度,但其他电机仍在 运行ning(线程)。
经过大量研究,我认为我通过使用队列和 Tkinters 后处理方法找到了解决方案。但它仍然不起作用,因为标签文本仅在所有线程终止后才会更新。
我写了一个示例代码,我想在其中获取电机 "one" 的标签更新,它将在电机 "two"(500 次迭代)之前完成其 for 循环(100 次迭代)。我希望标签会在第一个电机达到目标时立即更新,而第二个电机仍在 运行ing。
但是虽然我用了after-method,但还是要等到motor 2完成再更新label。
希望你能帮助我!
from tkinter import *
import threading
import time
from queue import *
class StepperMotors:
def __init__(self, root):
self.root = root
self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
self.start_btn.config(width = 10)
self.start_btn.grid(row=1,column=1)
self.label_one = Label(root, text='')
self.label_one.config(width = 10)
self.label_one.grid(row=2, column=1)
self.label_two = Label(root, text='')
self.label_two.config(width = 10)
self.label_two.grid(row=3, column=1)
def start_movement(self):
self.thread_queue = Queue()
self.root.after(100, self.wait_for_finish)
thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
def motor_actuation(self, motor, iterations):
for i in range(iterations):
i = i+1
update_text = str(motor) + " " + str(i) + "\n"
print(update_text)
time.sleep(0.01)
self.thread_queue.put(update_text)
def wait_for_finish(self):
try:
self.text = self.thread_queue.get()
self.label_one.config(text=self.text)
except self.thread_queue.empty():
self.root.after(100, self.wait_for_finish)
if __name__ == "__main__":
root = Tk()
root.title("test")
stepper = StepperMotors(root)
root.mainloop()
最好使用非阻塞的守护线程。
此外,最好有一个separation of concerns:机器人(或机械臂)可以是一个有自己生命周期的对象:守护线程。同上,您可以定义一个 "LabelUpdater" 来读取机器人的状态并更新标签。
让我们定义一个机器人:
- 它是在应用程序初始化时创建的,当用户单击 "Start" 按钮时是 运行,
- 机器人在应用级多线程队列中移动并报告其角度,
class Robot(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, end_pos: int):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.end_pos = end_pos
def run(self) -> None:
for angle in range(self.end_pos):
self.label_queue.put(angle)
time.sleep(0.01)
让我们定义一个 LabelUpdater:
- 它是在应用程序初始化时创建的,并且永远 运行(它可以观察机器人,即使它不是 运行ning)。
- 它读取机器人队列(例如每秒一次以避免闪烁)并更新标签
class LabelUpdater(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, root_app: tkinter.Tk, variable: tkinter.Variable):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.root_app = root_app
self.variable = variable
def run(self) -> None:
# run forever
while True:
# wait a second please
time.sleep(1)
# consume all the queue and keep only the last message
last_msg = None
while True:
try:
msg = self.label_queue.get(block=False)
except queue.Empty:
break
last_msg = msg
self.label_queue.task_done()
if last_msg:
self.variable.set(last_msg)
那么,主应用程序应该定义:
- 2 个多线程队列:每个标签一个,
- 2
tkinter.StringVar
将更新的变量, - 机器人和标签更新器,
- 两个更新程序已启动,并将运行永远。
class StepperMotors:
def __init__(self, root):
self.root = root
self.label_one_queue = queue.Queue()
self.label_two_queue = queue.Queue()
self.start_btn = tkinter.Button(root, text="Start", command=lambda: self.start_movement())
self.start_btn.config(width=10)
self.start_btn.grid(row=1, column=1)
self.text_one = tkinter.StringVar()
self.text_one.set("one")
self.label_one = tkinter.Label(root, textvariable=self.text_one)
self.label_one.config(width=10)
self.label_one.grid(row=2, column=1)
self.text_two = tkinter.StringVar()
self.text_two.set("two")
self.label_two = tkinter.Label(root, textvariable=self.text_two)
self.label_two.config(width=10)
self.label_two.grid(row=3, column=1)
self.robot_one = Robot("robot_one", self.label_one_queue, 100)
self.robot_two = Robot("robot_two", self.label_two_queue, 500)
self.updater_one = LabelUpdater("updater_one", self.label_one_queue, self.root, self.text_one)
self.updater_two = LabelUpdater("updater_two", self.label_two_queue, self.root, self.text_two)
self.updater_one.start()
self.updater_two.start()
def start_movement(self):
self.robot_one.start()
self.robot_two.start()
当然,您需要一个标志或其他东西来检查每个机器人是否已经 运行ning。