停止第三方功能
Stopping a third party function
这是一个复杂项目的一部分,我会尽量简化它。
我有一个 class 获取可调用对象并执行它,可调用对象可以 运行 持续任意时间。如果我收到一个信号(可以使用 Signal 或我观察到的任何其他标志)终止,我想当场终止可调用函数的执行(当然不退出进程)
class FooRunner(object):
def goo(self, foo):
try:
foo()
except:
pass
def on_stop_signal(self):
pass
在 single-threaded 信号上而不是 Windows 上的 运行(即任何 Unix 风格),您可以为此使用 signal.alarm。
查看文档中的第一个示例 - 它或多或少是您所要求的:
https://docs.python.org/2/library/signal.html
如果有人需要这个,这里有一个代码示例(需要注意的一件事 signal.signal
只能从主线程调用):
#!/usr/bin/python
import time
import signal
import threading
class MyException(Exception):
pass
class FooRunner(object):
def goo(self, foo):
try:
signal.signal(signal.SIGALRM, self.on_stop_signal)
foo()
except MyException:
print('caugt alarm exception')
def on_stop_signal(self, *args):
print('alarm triggered')
raise MyException()
def sample_foo():
time.sleep(30)
def stop_it():
signal.alarm(3)
print('alarm was set for 3 seconds')
if __name__ == "__main__":
print('starting')
fr = FooRunner()
t = threading.Thread(target=stop_it)
t.start()
fr.goo(sample_foo)
谢谢@jsbueno
这是一个复杂项目的一部分,我会尽量简化它。
我有一个 class 获取可调用对象并执行它,可调用对象可以 运行 持续任意时间。如果我收到一个信号(可以使用 Signal 或我观察到的任何其他标志)终止,我想当场终止可调用函数的执行(当然不退出进程)
class FooRunner(object):
def goo(self, foo):
try:
foo()
except:
pass
def on_stop_signal(self):
pass
在 single-threaded 信号上而不是 Windows 上的 运行(即任何 Unix 风格),您可以为此使用 signal.alarm。
查看文档中的第一个示例 - 它或多或少是您所要求的: https://docs.python.org/2/library/signal.html
如果有人需要这个,这里有一个代码示例(需要注意的一件事 signal.signal
只能从主线程调用):
#!/usr/bin/python
import time
import signal
import threading
class MyException(Exception):
pass
class FooRunner(object):
def goo(self, foo):
try:
signal.signal(signal.SIGALRM, self.on_stop_signal)
foo()
except MyException:
print('caugt alarm exception')
def on_stop_signal(self, *args):
print('alarm triggered')
raise MyException()
def sample_foo():
time.sleep(30)
def stop_it():
signal.alarm(3)
print('alarm was set for 3 seconds')
if __name__ == "__main__":
print('starting')
fr = FooRunner()
t = threading.Thread(target=stop_it)
t.start()
fr.goo(sample_foo)
谢谢@jsbueno