使用 for 循环生成的多个 QProcess 终止 QThread
Terminating QThread with multiple QProcess generated with for loop
我最近问了一个 关于如何防止 QThread 在通过使用嵌套循环生成 运行 多个 QProcess 实例时冻结我的 GUI。提供的解决方案非常有效!我能够完成我的程序,并且运行顺利!我最近添加了一项功能,允许用户在此过程中的任何时候停止 QThread。停止功能的工作原理是没有调用 QProcesse 的新实例;然而,仍有许多未完成的 QProcess 实例仍在处理中。有没有办法确定何时不再有 QProcess 运行 的任何实例并通知用户?我熟悉 QProcess 方法 waitForFinished(),但我不确定如何将其应用于使用嵌套循环生成的 QProcess 的多个实例。
这是模拟我的实际程序的数据处理方面的代码示例:
# This Python file uses the following encoding: utf-8
import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import test
import time
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui=test.Ui_test()
self.ui.setupUi(self)
self.ui.pushButton_startThread.clicked.connect(self.startTestThread)
self.ui.pushButton_Update.clicked.connect(self.stopThread)
def startTestThread(self):
self.xValue = self.ui.lineEdit_x.text() #Represents number of batches
self.yValue = self.ui.lineEdit_y.text() #Represents number of images per batch
self.runTest = testThread(self.xValue, self.yValue, self) # Creates an instance of testThread
self.runTest.start() # Starts the instance of testThread
def stopThread(self):
self.runTest.stop()
self.ui.lineEdit_x.setText("")
self.ui.lineEdit_y.setText("")
class testThread(QThread):
def __init__(self, xValue, yValue, parent=None):
super().__init__(parent)
self.xValue = xValue
self.yValue = yValue
def __del__(self):
self.wait()
def run(self):
processd = []
for x in range(int(self.xValue)): # For loop to iterate througeach batch
pass
for y in range(
int(self.yValue)): # For loop to iterate through each image in each batch
time.sleep(.1)
QProcess.startDetached(os.path.dirname(os.path.realpath(__file__)) + r"\test.bat") # Runs test.bat
def stop(self):
self.terminate()
self.wait()
print("\nStopping thread")
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = MainWindow()
window.show()
sys.exit(app.exec_())
test.bat
@ECHO OFF
ECHO Processing Batched Images.
TIMEOUT /T 15
ECHO Process Finished.
如前所述,停止 QThread 是有效的;但是,我希望能够确定所有 QProcess 实例何时完成。这将允许程序通知用户他们可以使用新参数重新启动进程。目前,我不得不告诉我的实验室伙伴等到没有更多的输出文件被写入,然后再尝试一组新的参数。
任何帮助将不胜感激。感谢您的宝贵时间!
虽然不优雅,但我想出了一种方法来确定 QProcess 的多个实例在终止 QThread 后何时完成:
- 我添加了一个批次计数器,每次 QProcess 实例排队时该计数器递增 1。
- 一旦线程被取消,就会用当前的批次计数器发出一个信号。
- 此信号连接到一个方法,该方法将确定 QProcess 的所有实例何时完成处理。
- 对于我的程序,QProcess 的每个实例都会生成一个唯一的输出文件。
- 使用while循环,可以确定所有QProcess实例完成的时刻。
这是代码示例:
# This Python file uses the following encoding: utf-8
import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import test
import time
from glob import glob1
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui=test.Ui_test()
self.ui.setupUi(self)
self.ui.pushButton_startThread.clicked.connect(self.startTestThread)
self.ui.pushButton_Update.clicked.connect(self.stopThread)
def startTestThread(self):
self.xValue = self.ui.lineEdit_x.text() #Represents number of batches
self.yValue = self.ui.lineEdit_y.text() #Represents number of images per batch
self.runTest = testThread(self.xValue, self.yValue, self) # Creates an instance of testThread
self.runTest.signals.batchesSubmitted.connect(self.testThreadFinished)
self.runTest.start() # Starts the instance of testThread
def stopThread(self):
self.runTest.stop()
@pyqtSlot(int)
def testThreadFinished(self, batchesSubmitted):
while len(glob1(outputFilePath + fileName, "*.txt")) < batchesSubmitted: #Compare the number of *.txt files in the output directory to the number of isntances of QProcess called
time.sleep(1)
self.ui.lineEdit_x.setText("")
self.ui.lineEdit_y.setText("")
print("All instance of QProcess are finished!")
class testThreadSignals(QObject):
batchesSubmitted = pyqtSignal(int)
class testThread(QThread):
def __init__(self, xValue, yValue, parent=None):
super().__init__(parent)
self.xValue = xValue
self.yValue = yValue
self.signals = testThreadSignals()
def run(self):
self.batchCounter = 0
for x in range(int(self.xValue)): # For loop to iterate througeach batch
pass
for y in range(
int(self.yValue)): # For loop to iterate through each image in each batch
time.sleep(.1)
QProcess.startDetached(os.path.dirname(os.path.realpath(__file__)) + r"\test.bat") # Runs test.bat
self.batchCounter += 1 #Counts the successful number *.txt files written
@pyqtSlot(int)
def stop(self):
self.terminate()
self.threadactive = False
self.wait()
self.signals.batchesSubmitted.emit(self.batchCounter)
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = MainWindow()
window.show()
sys.exit(app.exec_())
test.bat:
@ECHO OFF
ECHO Processing Batched Images.
TIMEOUT /T 15
REM write a unique text file
ECHO Process Finished.
我能够毫无问题地将上述 python 代码应用到我的程序中。
我最近问了一个
这是模拟我的实际程序的数据处理方面的代码示例:
# This Python file uses the following encoding: utf-8
import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import test
import time
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui=test.Ui_test()
self.ui.setupUi(self)
self.ui.pushButton_startThread.clicked.connect(self.startTestThread)
self.ui.pushButton_Update.clicked.connect(self.stopThread)
def startTestThread(self):
self.xValue = self.ui.lineEdit_x.text() #Represents number of batches
self.yValue = self.ui.lineEdit_y.text() #Represents number of images per batch
self.runTest = testThread(self.xValue, self.yValue, self) # Creates an instance of testThread
self.runTest.start() # Starts the instance of testThread
def stopThread(self):
self.runTest.stop()
self.ui.lineEdit_x.setText("")
self.ui.lineEdit_y.setText("")
class testThread(QThread):
def __init__(self, xValue, yValue, parent=None):
super().__init__(parent)
self.xValue = xValue
self.yValue = yValue
def __del__(self):
self.wait()
def run(self):
processd = []
for x in range(int(self.xValue)): # For loop to iterate througeach batch
pass
for y in range(
int(self.yValue)): # For loop to iterate through each image in each batch
time.sleep(.1)
QProcess.startDetached(os.path.dirname(os.path.realpath(__file__)) + r"\test.bat") # Runs test.bat
def stop(self):
self.terminate()
self.wait()
print("\nStopping thread")
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = MainWindow()
window.show()
sys.exit(app.exec_())
test.bat
@ECHO OFF
ECHO Processing Batched Images.
TIMEOUT /T 15
ECHO Process Finished.
如前所述,停止 QThread 是有效的;但是,我希望能够确定所有 QProcess 实例何时完成。这将允许程序通知用户他们可以使用新参数重新启动进程。目前,我不得不告诉我的实验室伙伴等到没有更多的输出文件被写入,然后再尝试一组新的参数。
任何帮助将不胜感激。感谢您的宝贵时间!
虽然不优雅,但我想出了一种方法来确定 QProcess 的多个实例在终止 QThread 后何时完成:
- 我添加了一个批次计数器,每次 QProcess 实例排队时该计数器递增 1。
- 一旦线程被取消,就会用当前的批次计数器发出一个信号。
- 此信号连接到一个方法,该方法将确定 QProcess 的所有实例何时完成处理。
- 对于我的程序,QProcess 的每个实例都会生成一个唯一的输出文件。
- 使用while循环,可以确定所有QProcess实例完成的时刻。
这是代码示例:
# This Python file uses the following encoding: utf-8
import sys
import os
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
import test
import time
from glob import glob1
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.ui=test.Ui_test()
self.ui.setupUi(self)
self.ui.pushButton_startThread.clicked.connect(self.startTestThread)
self.ui.pushButton_Update.clicked.connect(self.stopThread)
def startTestThread(self):
self.xValue = self.ui.lineEdit_x.text() #Represents number of batches
self.yValue = self.ui.lineEdit_y.text() #Represents number of images per batch
self.runTest = testThread(self.xValue, self.yValue, self) # Creates an instance of testThread
self.runTest.signals.batchesSubmitted.connect(self.testThreadFinished)
self.runTest.start() # Starts the instance of testThread
def stopThread(self):
self.runTest.stop()
@pyqtSlot(int)
def testThreadFinished(self, batchesSubmitted):
while len(glob1(outputFilePath + fileName, "*.txt")) < batchesSubmitted: #Compare the number of *.txt files in the output directory to the number of isntances of QProcess called
time.sleep(1)
self.ui.lineEdit_x.setText("")
self.ui.lineEdit_y.setText("")
print("All instance of QProcess are finished!")
class testThreadSignals(QObject):
batchesSubmitted = pyqtSignal(int)
class testThread(QThread):
def __init__(self, xValue, yValue, parent=None):
super().__init__(parent)
self.xValue = xValue
self.yValue = yValue
self.signals = testThreadSignals()
def run(self):
self.batchCounter = 0
for x in range(int(self.xValue)): # For loop to iterate througeach batch
pass
for y in range(
int(self.yValue)): # For loop to iterate through each image in each batch
time.sleep(.1)
QProcess.startDetached(os.path.dirname(os.path.realpath(__file__)) + r"\test.bat") # Runs test.bat
self.batchCounter += 1 #Counts the successful number *.txt files written
@pyqtSlot(int)
def stop(self):
self.terminate()
self.threadactive = False
self.wait()
self.signals.batchesSubmitted.emit(self.batchCounter)
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = MainWindow()
window.show()
sys.exit(app.exec_())
test.bat:
@ECHO OFF
ECHO Processing Batched Images.
TIMEOUT /T 15
REM write a unique text file
ECHO Process Finished.
我能够毫无问题地将上述 python 代码应用到我的程序中。