如何从 python 模块无限循环 运行 多个 .py 文件?
how to run multiple .py files with infinite loop from a python module?
我有两个文件。我们称它为 file_X.py 和 file_Y.py。两者都有无限循环,不断从 COM 端口读取数据。我有一个带有两个按钮的 tkinter 模块,用于启动 file_X 和 file_Y。因此,如果我单击按钮 A,我希望 file_X 到 运行 并且按钮 B 将启动 file_Y。我怎样才能 运行 这些文件并行并让它们在各自的命令提示符终端中显示数据?
我尝试使用 运行py 和 os.system。 os.system 会抛出一个错误,即使这两个文件的模块本身都运行良好。另一方面,运行py 不允许我在第一个模块 运行ning 时点击另一个按钮。
Tkinter 模块:
import tkinter as tk
import time
import runpy
root = tk.Tk()
root. title("App")
root.geometry('700x500')
v = tk.IntVar()
v.set(-1)
button_labels = [
(" Device 1 "),
(" Device 2 ")]
def ShowChoice():
choice = v.get() + 1
if(choice == 1):
runpy.run_module('file_X', run_name='__main__')
elif(choice == 2):
runpy.run_module('file_Y', run_name='__main__')
tk.Label(root,
text="""Choose the device you want to launch:""",
font = 'Arial 20 bold',
justify = tk.LEFT,
height = 6,
padx = 20).pack()
for val, button_label in enumerate(button_labels):
tk.Radiobutton(root,
text = button_label,
font = 'Times 12 bold',
indicatoron = 0,
bg = 'cornflower blue',
width = 40,
padx = 20,
pady = 5,
variable=v,
command=ShowChoice,
value=val).pack(anchor=tk.S)
root.mainloop()
file_X 和 file_Y 具有几乎相同的代码,但连接到不同的 COM 端口并具有不同的字符串修改。
import serial
import time
import csv
try:
ser = serial.Serial("COM4",
baudrate=2400,
bytesize=serial.EIGHTBITS,
parity =serial.PARITY_ODD)
except:
print("Device not detected")
def Reader():
global ser
try:
data = ser.readline().decode('utf-8')
data = str(data).replace("\r\n","")
data = data.replace("\x000","")
return data
except:
return "Data Unavailable"
def Start():
date_now = time.strftime('%d.%m.%y')
time_now = time.strftime('%H.%M.%S')
file_name = date_now + '__' + time_now + '.csv'
with open(file_name, 'w+') as f:
csv_file = csv.writer(f)
csv_file.writerow(['DATE','TIME','VALUE'])
while True:
date_now = time.strftime('%d/%m/%y')
time_now = time.strftime('%H:%M:%S')
data = Reader()
csv_file.writerow([date_now, time_now, data])
print([date_now, time_now, data])
if __name__ =='__main__':
Start()
您可以使用线程、多处理或子进程模块。
这是一个演示线程模块的快速示例。
import threading, time
def Start(name=''):
cnt=0
while(cnt<10):
cnt+=1
print "This is thread %s" % name
time.sleep(1)
thread1 = threading.Thread(target=Start, name='Thread-1', args=('Serial1',))
thread2 = threading.Thread(target=Start, name='Thread-2', args=('Serial2',))
thread2.start()
thread1.start()
while (thread1.isAlive() and thread2.isAlive()):
time.sleep(2)
print "Running Threads : %s" % [thread.name for thread in threading.enumerate()]
print "done"
按照建议,您可以导入 file_X 和 file_Y 并使用 Start() 函数作为要由 运行 调用的目标(可调用对象)为每个线程创建一个线程() 每个线程的方法。
import file_X, file_Y
thread1 = threading.Thread(target=file_X.Start, name='COM1')
thread2 = threading.Thread(target=file_Y.Start, name='COM2')
thread1.start()
thread2.start()
多处理模块类似于线程。
或者 运行 file_X 和 file_Y 作为使用子流程模块的子流程。
NEW <<<<
Here is a solution using threading. I've only tested it with one port.
import threading
import time
import serial
import sys, os.path
import csv
def OpenSerialPort(port=""):
print ("Open port %s" % port)
serPort = None
try:
serPort = serial.Serial(port,
baudrate=2400,
bytesize=serial.EIGHTBITS,
parity =serial.PARITY_ODD)
except serial.SerialException as msg:
print( "Error opening serial port %s" % msg)
except:
exctype, errorMsg = sys.exc_info()[:2]
print ("%s %s" % (errorMsg, exctype))
return serPort
def Reader(file_name, serialPort, stopped):
print ("Start reading serial port %s." % serialPort.name)
serialPort.timeout = 1.0
while not stopped.is_set():
serData = ''
try:
#print "Reading port..."
serData = serialPort.readline()
except:
exctype, errorMsg = sys.exc_info()[:2]
print ("Error reading port - %s" % errorMsg)
stopped.set()
break
if len(serData) > 0:
serData = serData.decode('utf-8')
serData = str(serData).replace("\r\n","")
serData = serData.replace("\x000","")
Log_Data(file_name, serData)
#else:
# print("Reader() no Data")
serialPort.close()
print ("Reader finished. Closed %s" % serialPort.name)
def Init_Log(portName=''):
#Create log file
portName = os.path.basename(portName)
file_name = time.strftime('%d.%m.%y__%H.%M.%S') + "__%s.csv" % portName
with open(file_name, 'w') as f:
csv_file = csv.writer(f)
csv_file.writerow(['DATE','TIME','VALUE'])
return file_name
def Log_Data(file_name='', dataString=''):
date_now = time.strftime('%d/%m/%y')
time_now = time.strftime('%H:%M:%S')
with open(file_name, 'a') as f:
csv_file = csv.writer(f)
csv_file.writerow([date_now, time_now, dataString])
print([date_now, time_now, dataString])
if __name__ == "__main__":
stopped = threading.Event() # Create stopped event to notify all threads when it is time to stop.
#Open COM3 ports
portName = 'COM3'
serialPort_1 = OpenSerialPort(portName)
if serialPort_1 == None:
sys.exit(1)
file_name_1 = Init_Log(portName) #Create log file
p1 = threading.Thread(target=Reader, args=(file_name_1, serialPort_1, stopped,))
#Open COM4 ports
portName = 'COM4'
serialPort_2 = OpenSerialPort(portName)
if serialPort_2 == None:
sys.exit(1)
#Create log file
file_name_2 = Init_Log(portName)
p2 = threading.Thread(target=Reader, args=(file_name_2, serialPort_2, stopped,))
#Start port reader threads
p1.start()
p2.start()
#This is just a test loop that does nothing for awhile.
loopcnt = 20
while (loopcnt > 0) and (not stopped.is_set()):
loopcnt -= 1
print ("main() %d" % loopcnt)
try:
time.sleep(1)
except KeyboardInterrupt: #Capture Ctrl-C
print ("Captured Ctrl-C")
loopcnt=0
stopped.set()
stopped.set()
print ("Stopped")
p1.join()
p2.join()
serialPort_1.close()
serialPort_2.close()
print ("Done")
我有两个文件。我们称它为 file_X.py 和 file_Y.py。两者都有无限循环,不断从 COM 端口读取数据。我有一个带有两个按钮的 tkinter 模块,用于启动 file_X 和 file_Y。因此,如果我单击按钮 A,我希望 file_X 到 运行 并且按钮 B 将启动 file_Y。我怎样才能 运行 这些文件并行并让它们在各自的命令提示符终端中显示数据?
我尝试使用 运行py 和 os.system。 os.system 会抛出一个错误,即使这两个文件的模块本身都运行良好。另一方面,运行py 不允许我在第一个模块 运行ning 时点击另一个按钮。
Tkinter 模块:
import tkinter as tk
import time
import runpy
root = tk.Tk()
root. title("App")
root.geometry('700x500')
v = tk.IntVar()
v.set(-1)
button_labels = [
(" Device 1 "),
(" Device 2 ")]
def ShowChoice():
choice = v.get() + 1
if(choice == 1):
runpy.run_module('file_X', run_name='__main__')
elif(choice == 2):
runpy.run_module('file_Y', run_name='__main__')
tk.Label(root,
text="""Choose the device you want to launch:""",
font = 'Arial 20 bold',
justify = tk.LEFT,
height = 6,
padx = 20).pack()
for val, button_label in enumerate(button_labels):
tk.Radiobutton(root,
text = button_label,
font = 'Times 12 bold',
indicatoron = 0,
bg = 'cornflower blue',
width = 40,
padx = 20,
pady = 5,
variable=v,
command=ShowChoice,
value=val).pack(anchor=tk.S)
root.mainloop()
file_X 和 file_Y 具有几乎相同的代码,但连接到不同的 COM 端口并具有不同的字符串修改。
import serial
import time
import csv
try:
ser = serial.Serial("COM4",
baudrate=2400,
bytesize=serial.EIGHTBITS,
parity =serial.PARITY_ODD)
except:
print("Device not detected")
def Reader():
global ser
try:
data = ser.readline().decode('utf-8')
data = str(data).replace("\r\n","")
data = data.replace("\x000","")
return data
except:
return "Data Unavailable"
def Start():
date_now = time.strftime('%d.%m.%y')
time_now = time.strftime('%H.%M.%S')
file_name = date_now + '__' + time_now + '.csv'
with open(file_name, 'w+') as f:
csv_file = csv.writer(f)
csv_file.writerow(['DATE','TIME','VALUE'])
while True:
date_now = time.strftime('%d/%m/%y')
time_now = time.strftime('%H:%M:%S')
data = Reader()
csv_file.writerow([date_now, time_now, data])
print([date_now, time_now, data])
if __name__ =='__main__':
Start()
您可以使用线程、多处理或子进程模块。
这是一个演示线程模块的快速示例。
import threading, time
def Start(name=''):
cnt=0
while(cnt<10):
cnt+=1
print "This is thread %s" % name
time.sleep(1)
thread1 = threading.Thread(target=Start, name='Thread-1', args=('Serial1',))
thread2 = threading.Thread(target=Start, name='Thread-2', args=('Serial2',))
thread2.start()
thread1.start()
while (thread1.isAlive() and thread2.isAlive()):
time.sleep(2)
print "Running Threads : %s" % [thread.name for thread in threading.enumerate()]
print "done"
按照建议,您可以导入 file_X 和 file_Y 并使用 Start() 函数作为要由 运行 调用的目标(可调用对象)为每个线程创建一个线程() 每个线程的方法。
import file_X, file_Y
thread1 = threading.Thread(target=file_X.Start, name='COM1')
thread2 = threading.Thread(target=file_Y.Start, name='COM2')
thread1.start()
thread2.start()
多处理模块类似于线程。
或者 运行 file_X 和 file_Y 作为使用子流程模块的子流程。
NEW <<<< Here is a solution using threading. I've only tested it with one port.
import threading
import time
import serial
import sys, os.path
import csv
def OpenSerialPort(port=""):
print ("Open port %s" % port)
serPort = None
try:
serPort = serial.Serial(port,
baudrate=2400,
bytesize=serial.EIGHTBITS,
parity =serial.PARITY_ODD)
except serial.SerialException as msg:
print( "Error opening serial port %s" % msg)
except:
exctype, errorMsg = sys.exc_info()[:2]
print ("%s %s" % (errorMsg, exctype))
return serPort
def Reader(file_name, serialPort, stopped):
print ("Start reading serial port %s." % serialPort.name)
serialPort.timeout = 1.0
while not stopped.is_set():
serData = ''
try:
#print "Reading port..."
serData = serialPort.readline()
except:
exctype, errorMsg = sys.exc_info()[:2]
print ("Error reading port - %s" % errorMsg)
stopped.set()
break
if len(serData) > 0:
serData = serData.decode('utf-8')
serData = str(serData).replace("\r\n","")
serData = serData.replace("\x000","")
Log_Data(file_name, serData)
#else:
# print("Reader() no Data")
serialPort.close()
print ("Reader finished. Closed %s" % serialPort.name)
def Init_Log(portName=''):
#Create log file
portName = os.path.basename(portName)
file_name = time.strftime('%d.%m.%y__%H.%M.%S') + "__%s.csv" % portName
with open(file_name, 'w') as f:
csv_file = csv.writer(f)
csv_file.writerow(['DATE','TIME','VALUE'])
return file_name
def Log_Data(file_name='', dataString=''):
date_now = time.strftime('%d/%m/%y')
time_now = time.strftime('%H:%M:%S')
with open(file_name, 'a') as f:
csv_file = csv.writer(f)
csv_file.writerow([date_now, time_now, dataString])
print([date_now, time_now, dataString])
if __name__ == "__main__":
stopped = threading.Event() # Create stopped event to notify all threads when it is time to stop.
#Open COM3 ports
portName = 'COM3'
serialPort_1 = OpenSerialPort(portName)
if serialPort_1 == None:
sys.exit(1)
file_name_1 = Init_Log(portName) #Create log file
p1 = threading.Thread(target=Reader, args=(file_name_1, serialPort_1, stopped,))
#Open COM4 ports
portName = 'COM4'
serialPort_2 = OpenSerialPort(portName)
if serialPort_2 == None:
sys.exit(1)
#Create log file
file_name_2 = Init_Log(portName)
p2 = threading.Thread(target=Reader, args=(file_name_2, serialPort_2, stopped,))
#Start port reader threads
p1.start()
p2.start()
#This is just a test loop that does nothing for awhile.
loopcnt = 20
while (loopcnt > 0) and (not stopped.is_set()):
loopcnt -= 1
print ("main() %d" % loopcnt)
try:
time.sleep(1)
except KeyboardInterrupt: #Capture Ctrl-C
print ("Captured Ctrl-C")
loopcnt=0
stopped.set()
stopped.set()
print ("Stopped")
p1.join()
p2.join()
serialPort_1.close()
serialPort_2.close()
print ("Done")