如果子线程抛出异常,则中断主调用线程
Break Main Calling Thread If Child Thread Throws An Exception
我正在使用 threading.Thread 和 t.start() 以及可调用对象列表来执行长 运行 多线程处理。我的主线程被阻塞,直到所有线程都完成。但是,如果其中一个 Callable 抛出异常并终止其他线程,我希望 t.start() 立即 return。
使用 t.join() 检查线程是否已执行未提供有关因异常导致的失败的信息。
代码如下:
import json
import requests
class ThreadServices:
def __init__(self):
self.obj = ""
def execute_services(self, arg1, arg2):
try:
result = call_some_process(arg1, arg2) #some method
#save results somewhere
except Exception, e:
# raise exception
print e
def invoke_services(self, stubs):
"""
Thread Spanning Function
"""
try:
p1 = "" #some value
p2 = "" #some value
# Call service 1
t1 = threading.Thread(target=self.execute_services, args=(a, b,)
# Start thread
t1.start()
# Block till thread completes execution
t1.join()
thread_pool = list()
for stub in stubs:
# Start parallel execution of threads
t = threading.Thread(target=self.execute_services,
args=(p1, p2))
t.start()
thread_pool.append(t)
for thread in thread_pool:
# Block till all the threads complete execution: Wait for all
the parallel tasks to complete
thread.join()
# Start another process thread
t2 = threading.Thread(target=self.execute_services,
args=(p1, p2)
t2.start()
# Block till this thread completes execution
t2.join()
requests.post(url, data= json.dumps({status_code=200}))
except Exception, e:
print e
requests.post(url, data= json.dumps({status_code=500}))
# Don't return anything as this function is invoked as a thread from
# main calling function
class Service(ThreadServices):
"""
Service Class
"""
def main_thread(self, request, context):
"""
Main Thread:Invokes Task Execution Sequence in ThreadedService
:param request:
:param context:
:return:
"""
try:
main_thread = threading.Thread(target=self.invoke_services,
args=(request,))
main_thread.start()
return True
except Exception, e:
return False
当我调用 Service().main_thread(request, context) 并且执行 t1 时出现一些异常,我需要在 main_thread 和 return False 中引发它。我如何为这个结构实现它。谢谢!!
一方面,你把事情复杂化了。我会这样做:
from thread import start_new_thread as thread
from time import sleep
class Task:
"""One thread per task.
This you should do with subclassing threading.Thread().
This is just conceptual example.
"""
def __init__ (self, func, args=(), kwargs={}):
self.func = func
self.args = args
self.kwargs = kwargs
self.error = None
self.done = 0
self.result = None
def _run (self):
self.done = 0
self.error = None
self.result = None
# So this is what you should do in subclassed Thread():
try: self.result = self.func(*self.args, **self.kwargs)
except Exception, e:
self.error = e
self.done = 1
def start (self):
thread(self._run,())
def wait (self, retrexc=1):
"""Used in place of threading.Thread.join(), but it returns the result of the function self.func() and manages errors.."""
while not self.done: sleep(0.001)
if self.error:
if retrexc: return self.error
raise self.error
return self.result
# And this is how you should use your pool:
def do_something (tasknr):
print tasknr-20
if tasknr%7==0: raise Exception, "Dummy exception!"
return tasknr**120/82.0
pool = []
for task in xrange(20, 50):
t = Task(do_something, (task,))
pool.append(t)
# And only then wait for each one:
results = []
for task in pool:
results.append(task.wait())
print results
这样您就可以让 task.wait() 引发错误。线程已经停止了。因此,您需要做的就是在完成后从池或整个池中删除它们的引用。您甚至可以:
results = []
for task in pool:
try: results.append(task.wait(0))
except Exception, e:
print task.args, "Error:", str(e)
print results
现在,不要严格使用它(我的意思是 Task() class),因为它需要添加很多东西才能真正使用。
只需 subclass threading.Thread() 并通过覆盖 运行() 和 join() 或添加新函数如 wait() 来实现类似的概念。
我正在使用 threading.Thread 和 t.start() 以及可调用对象列表来执行长 运行 多线程处理。我的主线程被阻塞,直到所有线程都完成。但是,如果其中一个 Callable 抛出异常并终止其他线程,我希望 t.start() 立即 return。
使用 t.join() 检查线程是否已执行未提供有关因异常导致的失败的信息。
代码如下:
import json
import requests
class ThreadServices:
def __init__(self):
self.obj = ""
def execute_services(self, arg1, arg2):
try:
result = call_some_process(arg1, arg2) #some method
#save results somewhere
except Exception, e:
# raise exception
print e
def invoke_services(self, stubs):
"""
Thread Spanning Function
"""
try:
p1 = "" #some value
p2 = "" #some value
# Call service 1
t1 = threading.Thread(target=self.execute_services, args=(a, b,)
# Start thread
t1.start()
# Block till thread completes execution
t1.join()
thread_pool = list()
for stub in stubs:
# Start parallel execution of threads
t = threading.Thread(target=self.execute_services,
args=(p1, p2))
t.start()
thread_pool.append(t)
for thread in thread_pool:
# Block till all the threads complete execution: Wait for all
the parallel tasks to complete
thread.join()
# Start another process thread
t2 = threading.Thread(target=self.execute_services,
args=(p1, p2)
t2.start()
# Block till this thread completes execution
t2.join()
requests.post(url, data= json.dumps({status_code=200}))
except Exception, e:
print e
requests.post(url, data= json.dumps({status_code=500}))
# Don't return anything as this function is invoked as a thread from
# main calling function
class Service(ThreadServices):
"""
Service Class
"""
def main_thread(self, request, context):
"""
Main Thread:Invokes Task Execution Sequence in ThreadedService
:param request:
:param context:
:return:
"""
try:
main_thread = threading.Thread(target=self.invoke_services,
args=(request,))
main_thread.start()
return True
except Exception, e:
return False
当我调用 Service().main_thread(request, context) 并且执行 t1 时出现一些异常,我需要在 main_thread 和 return False 中引发它。我如何为这个结构实现它。谢谢!!
一方面,你把事情复杂化了。我会这样做:
from thread import start_new_thread as thread
from time import sleep
class Task:
"""One thread per task.
This you should do with subclassing threading.Thread().
This is just conceptual example.
"""
def __init__ (self, func, args=(), kwargs={}):
self.func = func
self.args = args
self.kwargs = kwargs
self.error = None
self.done = 0
self.result = None
def _run (self):
self.done = 0
self.error = None
self.result = None
# So this is what you should do in subclassed Thread():
try: self.result = self.func(*self.args, **self.kwargs)
except Exception, e:
self.error = e
self.done = 1
def start (self):
thread(self._run,())
def wait (self, retrexc=1):
"""Used in place of threading.Thread.join(), but it returns the result of the function self.func() and manages errors.."""
while not self.done: sleep(0.001)
if self.error:
if retrexc: return self.error
raise self.error
return self.result
# And this is how you should use your pool:
def do_something (tasknr):
print tasknr-20
if tasknr%7==0: raise Exception, "Dummy exception!"
return tasknr**120/82.0
pool = []
for task in xrange(20, 50):
t = Task(do_something, (task,))
pool.append(t)
# And only then wait for each one:
results = []
for task in pool:
results.append(task.wait())
print results
这样您就可以让 task.wait() 引发错误。线程已经停止了。因此,您需要做的就是在完成后从池或整个池中删除它们的引用。您甚至可以:
results = []
for task in pool:
try: results.append(task.wait(0))
except Exception, e:
print task.args, "Error:", str(e)
print results
现在,不要严格使用它(我的意思是 Task() class),因为它需要添加很多东西才能真正使用。
只需 subclass threading.Thread() 并通过覆盖 运行() 和 join() 或添加新函数如 wait() 来实现类似的概念。