Python 中的超时功能

Timeout function in Python

我想在 Python (3.x) 中有一个函数,它强制脚本本身终止,例如:

i_time_value = 10
mytimeout(i_time_value )   # Terminate the script if not in i_time_value seconds 
for i in range(10):
   print("go")
   time.sleep(2)

其中 "mytimeout" 是我需要的功能:如果脚本未终止,它会在 "arg" 秒内终止脚本。

我见过为函数设置超时的好解决方案 here or here,但我不希望函数超时,而是脚本。

还有:

我会用这样的东西。

import sys
import time
import threading

def set_timeout(event):
    event.set()

event = threading.Event()
i_time_value = 2

t = threading.Timer(i_time_value, set_timeout, [event])
t.start()

for i in range(10):

    print("go")

    if event.is_set():
        print('Timed Out!')
        sys.exit()

    time.sleep(2)

一点点谷歌搜索this answer

import multiprocessing as MP
from sys import exc_info
from time import clock

DEFAULT_TIMEOUT = 60

################################################################################

def timeout(limit=None):
    if limit is None:
        limit = DEFAULT_TIMEOUT
    if limit <= 0:
        raise ValueError()
    def wrapper(function):
        return _Timeout(function, limit)
    return wrapper

class TimeoutError(Exception): pass

################################################################################

def _target(queue, function, *args, **kwargs):
    try:
        queue.put((True, function(*args, **kwargs)))
    except:
        queue.put((False, exc_info()[1]))

class _Timeout:

    def __init__(self, function, limit):
        self.__limit = limit
        self.__function = function
        self.__timeout = clock()
        self.__process = MP.Process()
        self.__queue = MP.Queue()

    def __call__(self, *args, **kwargs):
        self.cancel()
        self.__queue = MP.Queue(1)
        args = (self.__queue, self.__function) + args
        self.__process = MP.Process(target=_target, args=args, kwargs=kwargs)
        self.__process.daemon = True
        self.__process.start()
        self.__timeout = self.__limit + clock()

    def cancel(self):
        if self.__process.is_alive():
            self.__process.terminate()

    @property
    def ready(self):
        if self.__queue.full():
            return True
        elif not self.__queue.empty():
            return True
        elif self.__timeout < clock():
            self.cancel()
        else:
            return False

    @property
    def value(self):
        if self.ready is True:
            flag, load = self.__queue.get()
            if flag:
                return load
            raise load
        raise TimeoutError()

    def __get_limit(self):
        return self.__limit

    def __set_limit(self, value):
        if value <= 0:
            raise ValueError()
        self.__limit = value

    limit = property(__get_limit, __set_limit)

可能是 Python 2.x,但转换起来应该不是很难。

signal is not Windows compatible.

您可以在 Windows 上发送 一些 信号,例如:

os.kill(os.getpid(), signal.CTRL_C_EVENT) # send Ctrl+C to itself

您可以使用 threading.Timer 稍后调用函数:

from threading import Timer

def kill_yourself(delay):
    t = Timer(delay, kill_yourself_now)
    t.daemon = True # no need to kill yourself if we're already dead
    t.start()

其中 kill_yourself_now():

import os
import signal
import sys

def kill_yourself_now():
    sig = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
    os.kill(os.getpid(), sig) # raise KeyboardInterrupt in the main thread

如果您的脚本启动了其他进程,请参阅:how to kill child process(es) when parent dies? See also, How to terminate a python subprocess launched with shell=True -- 它演示了如何终止进程树。