检查 Timer.cancel 是否在单元测试中被调用

Check if Timer.cancel is called in unit test

我正在使用 threading.Timer 包在 x 秒后执行一个方法。但是,在某些情况下,我想更早地执行此方法并取消计时器(因此它不会被调用两次)。 如何对此进行单元测试?

我想知道计时器是否已经停止以便不再调用该方法。我现在使用下面的代码,不幸的是 is_alive still returns True

from threading import Timer

Class X():
    def __init__(self, timeout):
        self.timer = Timer(timeout, self.some_method)
        self.timer.start()

    def some_method(self):
        # Do something

    def other_method(self):
        self.timer.cancel()
        self.some_method()

import unittest

Class TestX(unittest.TestCase):
    def test_cancel_timer(self):
        x = X(1000)
        x.other_method()
        self.assertFalse(x.timer.is_alive())

形成文档 is_alive 方法 returns 在 run 操作期间为真;

Return whether the thread is alive. This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.

cancel 方法的文档说明如下;

Stop the timer, and cancel the execution of the timer’s action. This will only work if the timer is still in its waiting stage.

这是否意味着 cancel 方法不会停止 run 操作?或者在 运行 方法之后仍然处于灰色区域并且 returns 因此是正确的?

使用 timer.is_alive() 你只是检查 timer-thread 本身是否存在,所以如果你想“检查 timer.cancel() 是否被调用”,你正在测试错误

Does this mean that the cancel method does not stop the run action?

它不会停止 run()-功能,对吧。 timer.cancel() 只是在 Event 对象中设置一个标志,该对象由 run 检查。您可以测试标志是否设置为:

self.assertTrue(x.timer.finished.is_set())

不幸的是,检查取消不足以防止重复执行,因为 run 可能已经通过了检查,就像您在源代码中看到的那样:

# threading.py (Python 3.7.1):

class Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=None, kwargs=None)
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=None, kwargs=None):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args if args is not None else []
        self.kwargs = kwargs if kwargs is not None else {}
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet."""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

需要更多的努力来确保独特的执行。我在我的回答 .

中写了一个可能的解决方案