Python 中的超时功能
Timeout function in Python
我想在 Python (3.x) 中有一个函数,它强制脚本本身终止,例如:
i_time_value = 10
mytimeout(i_time_value ) # Terminate the script if not in i_time_value seconds
for i in range(10):
print("go")
time.sleep(2)
其中 "mytimeout" 是我需要的功能:如果脚本未终止,它会在 "arg" 秒内终止脚本。
我见过为函数设置超时的好解决方案 here or here,但我不希望函数超时,而是脚本。
还有:
- 我知道我可以将我的脚本放在 function 中或使用子进程之类的东西并将其与
超时,我试过了并且有效,但我想要更简单的东西。
- 它必须与 Unix & Windows 兼容。
- 该函数必须是通用的,即:它可以添加到任何脚本中
一行(导入除外)
- 我需要一个函数,而不是 'how to put a timeout in a script'。
我会用这样的东西。
import sys
import time
import threading
def set_timeout(event):
event.set()
event = threading.Event()
i_time_value = 2
t = threading.Timer(i_time_value, set_timeout, [event])
t.start()
for i in range(10):
print("go")
if event.is_set():
print('Timed Out!')
sys.exit()
time.sleep(2)
一点点谷歌搜索this answer:
import multiprocessing as MP
from sys import exc_info
from time import clock
DEFAULT_TIMEOUT = 60
################################################################################
def timeout(limit=None):
if limit is None:
limit = DEFAULT_TIMEOUT
if limit <= 0:
raise ValueError()
def wrapper(function):
return _Timeout(function, limit)
return wrapper
class TimeoutError(Exception): pass
################################################################################
def _target(queue, function, *args, **kwargs):
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, exc_info()[1]))
class _Timeout:
def __init__(self, function, limit):
self.__limit = limit
self.__function = function
self.__timeout = clock()
self.__process = MP.Process()
self.__queue = MP.Queue()
def __call__(self, *args, **kwargs):
self.cancel()
self.__queue = MP.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = MP.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + clock()
def cancel(self):
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < clock():
self.cancel()
else:
return False
@property
def value(self):
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
raise TimeoutError()
def __get_limit(self):
return self.__limit
def __set_limit(self, value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit, __set_limit)
可能是 Python 2.x,但转换起来应该不是很难。
signal is not Windows compatible.
您可以在 Windows 上发送 一些 信号,例如:
os.kill(os.getpid(), signal.CTRL_C_EVENT) # send Ctrl+C to itself
您可以使用 threading.Timer
稍后调用函数:
from threading import Timer
def kill_yourself(delay):
t = Timer(delay, kill_yourself_now)
t.daemon = True # no need to kill yourself if we're already dead
t.start()
其中 kill_yourself_now()
:
import os
import signal
import sys
def kill_yourself_now():
sig = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
os.kill(os.getpid(), sig) # raise KeyboardInterrupt in the main thread
如果您的脚本启动了其他进程,请参阅:how to kill child process(es) when parent dies? See also, How to terminate a python subprocess launched with shell=True -- 它演示了如何终止进程树。
我想在 Python (3.x) 中有一个函数,它强制脚本本身终止,例如:
i_time_value = 10
mytimeout(i_time_value ) # Terminate the script if not in i_time_value seconds
for i in range(10):
print("go")
time.sleep(2)
其中 "mytimeout" 是我需要的功能:如果脚本未终止,它会在 "arg" 秒内终止脚本。
我见过为函数设置超时的好解决方案 here or here,但我不希望函数超时,而是脚本。
还有:
- 我知道我可以将我的脚本放在 function 中或使用子进程之类的东西并将其与 超时,我试过了并且有效,但我想要更简单的东西。
- 它必须与 Unix & Windows 兼容。
- 该函数必须是通用的,即:它可以添加到任何脚本中 一行(导入除外)
- 我需要一个函数,而不是 'how to put a timeout in a script'。
我会用这样的东西。
import sys
import time
import threading
def set_timeout(event):
event.set()
event = threading.Event()
i_time_value = 2
t = threading.Timer(i_time_value, set_timeout, [event])
t.start()
for i in range(10):
print("go")
if event.is_set():
print('Timed Out!')
sys.exit()
time.sleep(2)
一点点谷歌搜索this answer:
import multiprocessing as MP
from sys import exc_info
from time import clock
DEFAULT_TIMEOUT = 60
################################################################################
def timeout(limit=None):
if limit is None:
limit = DEFAULT_TIMEOUT
if limit <= 0:
raise ValueError()
def wrapper(function):
return _Timeout(function, limit)
return wrapper
class TimeoutError(Exception): pass
################################################################################
def _target(queue, function, *args, **kwargs):
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, exc_info()[1]))
class _Timeout:
def __init__(self, function, limit):
self.__limit = limit
self.__function = function
self.__timeout = clock()
self.__process = MP.Process()
self.__queue = MP.Queue()
def __call__(self, *args, **kwargs):
self.cancel()
self.__queue = MP.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = MP.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + clock()
def cancel(self):
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < clock():
self.cancel()
else:
return False
@property
def value(self):
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
raise TimeoutError()
def __get_limit(self):
return self.__limit
def __set_limit(self, value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit, __set_limit)
可能是 Python 2.x,但转换起来应该不是很难。
signal is not Windows compatible.
您可以在 Windows 上发送 一些 信号,例如:
os.kill(os.getpid(), signal.CTRL_C_EVENT) # send Ctrl+C to itself
您可以使用 threading.Timer
稍后调用函数:
from threading import Timer
def kill_yourself(delay):
t = Timer(delay, kill_yourself_now)
t.daemon = True # no need to kill yourself if we're already dead
t.start()
其中 kill_yourself_now()
:
import os
import signal
import sys
def kill_yourself_now():
sig = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
os.kill(os.getpid(), sig) # raise KeyboardInterrupt in the main thread
如果您的脚本启动了其他进程,请参阅:how to kill child process(es) when parent dies? See also, How to terminate a python subprocess launched with shell=True -- 它演示了如何终止进程树。