如何在 Python 中使用多处理定期调用函数?

How to periodically call a function using multiprocessing in Python?

我想调用一个函数,该函数使用多处理库(需要非阻塞)定期检查总线上的值。有一种方法可以使用线程库来完成,但它不使用多进程。

我看过使用线程库实现此类功能的示例代码,但我想使用多处理库实现同样的功能。多处理状态的官方文档:

"Note multiprocessing contains no analogues of threading.active_count(), threading.enumerate(), threading.settrace(), threading.setprofile(), threading.Timer"

但是,在我看到的线程库示例中,它们使用 threading.Timer。 multiprocessing有类似的功能吗?

import time, threading
def foo():
    print(time.ctime())
    threading.Timer(10, foo).start()

foo()

#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011

Executing periodic actions in Python

以上代码是与线程库一起使用的示例。另外,我想知道这是否是不好的做法,因为线程永远不会终止 (.join())。

基本上,多处理模块中没有定时器class。应该是,但是当他们从 pyprocessing 迁移到 multiprocessing 时,他们似乎没有包括那部分。这里有解释 .

您可以像 dano 在

中发布的那样手动为多处理库创建一个工作计时器
from multiprocessing import Process, Event

class Timer(Process):
    def __init__(self, interval, function, args=[], kwargs={}):
        super(Timer, self).__init__()
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

在此处查看完整问题:Why no Timer class in Python's multiprocessing module?

要将 Nosvan 的答案扩展为真正的周期性计时器(好吧,它会漂移几毫秒),您只需像这样扩展 Timer class:

from typing import Callable


class PeriodicTimer(Timer):
    def __init__(self, interval: int, function: Callable):
        super(PeriodicTimer, self).__init__(interval, function)

    def run(self):
        while not self.finished_event.is_set():
            # function callback could set the stop event
            self.function(*self.args, **self.kwargs)
            self.finished_event.wait(self.timeout)

我可以建议修改 Timer class 以便在构造函数中没有可变参数:

from multiprocessing import Process
from multiprocessing import Event
from typing import Callable


class Timer(Process):
    def __init__(self, timeout: int, function: Callable, args=None, kwargs=None):
        super(Timer, self).__init__()
        self.timeout = timeout
        self.function = function
        self.args = [] if args is None else args
        self.kwargs = {} if kwargs is None else kwargs
        self.finished_event = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished_event.set()

    def run(self):
        self.finished_event.wait(self.timeout)
        if not self.finished_event.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished_event.set()

这里详尽解释了为什么要避免可变默认参数:

https://web.archive.org/web/20200221224620/http://effbot.org/zone/default-values.htm