我如何使用 PyQt 线程长时间休眠?

How do I sleep for long periods with PyQt threads?

我有一些特定的对象需要运行以特定的不断变化的时间间隔执行特定的功能,一次又一次,直到他们决定完成。

例如,一个对象可能需要等待 30 秒,运行,等待 60 秒,运行,等待 10 秒,运行...你明白了,这可能适用于 30-120 个不同的对象,运行具有完全相同的功能。

我在想,只要有一个按确切时间间隔休眠的函数就可以解决我的问题,但是,如果我错了请纠正我,我记得线程池 只能 运行 在任何给定时间都有一定数量的线程(对我来说是 12 个)。 如何绕过这个限制?

class Thing(object):
    def getCurrentPeriod(self):
        return random.randint(5, 30) # Some ever changing period of time

    def refresh(self):
        doThings() # A long running task that is disk and network intensive

    def waitRefresh(self):
        period = self.getCurrentPeriod()
        time.sleep(period) # Wait that period out
        self.refresh()
        return self.needRefresh()
        # Boolean if it needs to restart - Not sure about how to reschedule,  
        # or specifically where to connect the worker emit when it finishes 
        # to make sure this *specific* Thing obj gets it's waitRefresh func called again.

class App(QMainWindow):
    def __init__(self, *args, **kwargs):
    super(MainWindow, self).__init__(*args, **kwargs)

    self.threadpool = QThreadPool()

    # Add initial objects to pool (other portions of app may add more over time)
    for thing in self.acquireThings():
        worker = Worker(thing.waitRefresh)
        self.threadpool.start(worker)

不包括 WorkerSignals class 和 QRunnable subclass,this 示例包括我通常做的。该示例正在解决相同的问题,但是(很可能)效率低下。

编辑:New example 以及 time.sleep 如何不暂停线程并允许其他线程工作的完整工作示例。我觉得 async 可能是唯一的实现方式,但是否有快速修复方法,让我不必更改整个应用程序?

Here's what it looks like when you try to sleep more than 12 threads.

当我决定实际尝试 QTimer class 时,最终的解决方案出现了。也许有更多优化的解决方案,但这个似乎符合所有复选框,即使它简单得令人担忧。

import random
import time
import traceback

from functools import partial
from PyQt5.QtCore import *
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import *


class WorkerSignals(QObject):
    """
    Represents the signals a Worker can emit.
    """
    finished = pyqtSignal()
    starting = pyqtSignal(int) # ID of thread
    result = pyqtSignal(tuple) # Tuple refresh result, result and ID

class Worker(QRunnable):
    """
    A worker designed to tell when it's starting, when it's finished and the result.
    Designed to work around Thread.refresh().
    """

    def __init__(self, fn, thread_id, *args, **kwargs):
        super(Worker, self).__init__()

        # Store constructor arguments (re-used for processing)
        self.fn = fn
        self.id = thread_id
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        """
        Runs a given method, and emits the result with the Worker's coordinated ID.
        """
        try:
            self.signals.starting.emit(self.id) # Thread is now finally ready to work.
            result = self.fn(*self.args, **self.kwargs) # Refresh Thread!
            self.signals.result.emit(result) # Thread is finished, emit result tuple.
        except:
            traceback.print_exc()
        finally:
            self.signals.finished.emit()  # Done


class Thread(object):
    """
    Basic Rules for a Thread Object:
    Cannot store the next timestamp on the object (it's a database object, I don't believe it's good practice
    to be creating sessions over and over to simply read/write the access time.
    ID and Active are allowed as booleans.
    """
    i = -1

    def __init__(self):
        self.id = Thread.nextID()
        self.active = True
        self.refreshes = 0

    def refresh(self) -> tuple:
        """
        'Refreshes' a thread. Waits a specific period, then decides whether Thread object should be deactivated or
        returned from additional refreshes. Chance of deactivation lowers with each refresh.
        :return: The refresh result, a tuple with a boolean and the thread's ID (for identifying it later)
        """

        # Represents my SQL Alchemy Model's refresh() function
        self.refreshes += 1
        time.sleep(random.randint(2, 5))
        if random.random() <= max(0.1, 1.0 - ((self.refreshes + 5) / 10)):
            self.active = False
        return self.active, self.id

    @staticmethod
    def getRefreshTime() -> float:
        """
        Represents the amount of time before a thread should be refreshed.
        Should NOT be used to determine whether the thread is still active or not.

        :return: The time period that should be waited.
        """

        return random.uniform(10, 300)

    @staticmethod
    def nextID() -> int:
        """
        Returns integer thread IDs in sequence to remove possibility of duplicate IDs.
        :return: Integer Thread ID
        """
        Thread.i += 1
        return Thread.i

    def __repr__(self):
        return f'Thread(id={self.id} active={self.active})'


class MainWindow(QMainWindow):
    """
    GUI containing a Label, Button and ListWidget showing all the active sleeping/working threads.
    Manages a threadpool, a number of background singleshot timers, etc.
    """

    def __init__(self, *args, **kwargs):
        super(MainWindow, self).__init__(*args, **kwargs)

        # Widgets Setup
        layout = QVBoxLayout()
        self.list = QListWidget()
        self.l = QLabel("Total Active: 0")
        self.button = QPushButton("Refresh List")
        self.button.pressed.connect(self.refreshList)
        self.button.setDisabled(True)
        layout.addWidget(self.l)
        layout.addWidget(self.button)
        layout.addWidget(self.list)
        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)
        self.show()

        # Periodically add threads to the pool.
        self.poolTimer = QTimer()
        self.poolTimer.setInterval(5_000)
        self.poolTimer.timeout.connect(self.addThreads)

        # Threading Setup
        self.threadpool = QThreadPool()
        print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

        self.active, self.threads = {}, {}
        # Add a number of threads to start with.
        for _ in range(random.randint(5, 16)):
            self.setupThread(Thread())
        self.poolTimer.start()

    def refreshList(self):
        """
        Refreshes the ListWidget in the GUI with all the active/sleeping/working threads.
        """
        self.list.clear()
        bold = QFont()
        bold.setBold(True)

        active = 0
        for thread in self.threads.values():
            item = QListWidgetItem(
                f'Thread {thread.id}/{thread.refreshes}')
            # Bold a thread if it's working
            if self.active[thread.id]:
                active += 1
                item.setFont(bold)
            self.list.addItem(item)
        self.l.setText(f'Total Active: {active}/{len(self.threads)}')

    def refreshResult(self, result) -> None:
        """
        When a thread is finished, the result determines it's next course of action, which is either
        to return to the pool again, or delete itself.

        :param result: A tuple containing the result (bool) and the connected Thread ID.
        """
        self.active[result[1]] = False
        if result[0]:
            print(f'Restarting Thread {result[1]}')
            self.setupThread(self.threads[result[1]]) # Add by ID, which would normally be a database GET
        else:
            print(f'Thread {result[1]} shutting down.')
            del self.active[result[1]]
            del self.threads[result[1]]
        self.refreshList()

    def updateActivity(self, thread_id) -> None:
        """
        Connected to the starting signal, helps signal when a thread is actually being refreshed.

        :param thread_id: The Thread ID
        """
        print(f'Thread {thread_id} is now active/working.')
        self.active[thread_id] = True

    def refresh(self, thread):
        """
        Adds a new worker to the threadpool to be refreshed.
        Can't be considered a real start to the thread.refresh function, as the pool has a max of 12 workers at any time.
        The 'starting' signal can tell us when a specific thread is actually being refreshed, and is represented
        as a Bold element in the list.

        :param thread: A thread instance.
        """
        print(f'Adding Thread {thread.id} to the pool.')
        worker = Worker(thread.refresh, thread_id=thread.id)
        worker.signals.result.connect(self.refreshResult)
        worker.signals.starting.connect(self.updateActivity)
        self.threadpool.start(worker)
        # self.active[thread.id] = True
        self.refreshList()

    def setupThread(self, thread) -> None:
        """
        Adds a new timer designated to start a specific thread.
        :param thread: A thread instance.
        """
        self.active[thread.id] = False
        self.threads[thread.id] = thread
        t = QTimer()
        period = thread.getRefreshTime()
        t.singleShot(period * 1000, partial(self.refresh, thread=thread))
        print(f'Thread {thread.id} will start in {period} seconds.')
        self.refreshList()

    def addThreads(self):
        """
        Adds a number of threads to the pool. Called automatically every couple seconds.
        """

        add = max(0, 30 + random.randint(-5, 5) - len(self.threads))
        if add > 0:
            print(f'Adding {add} thread{"s" if add > 1 else ""}.')
            for _ in range(add):
                self.setupThread(Thread())

app = QApplication([])
window = MainWindow()
app.exec_()

当一个线程被请求时,一个定时器被创建并且 singleShot 在一个将它添加到线程池的额外函数上被触发。此线程池最多可以处理 12 个连续刷新 'refreshing' 线程,并且信号允许 GUI 在发现更改时进行更新。

数以千计的 'Thread' 对象可能正在等待,似乎 singleShot 能够在需要时将它们添加到池中。

信号有助于区分线程何时为 sleepingworkingactive(但 inactive 线程对象会立即被删除)。

我能想到的关于这个程序的唯一警告是:

1) QThread 实现能打败它吗?

2) 一旦 QTimer 函数执行并触发,QTimer 会发生什么?它们会得到正确的 GC,还是会在后台消耗资源?