如何在 Python 中的进程之间传递堆栈跟踪?
How to pass stacktrace between processes in Python?
我正在尝试创建一个 python 装饰器,它接受一个带有 args 和 kwargs 的函数,在一个新进程中执行它,关闭它并 returns 无论函数返回什么,包括提高同样的例外,如果有的话。
目前,我的装饰器可以很好地处理函数,如果它们没有引发异常,但无法提供回溯。我如何将它传递回父进程?
from functools import wraps
from multiprocessing import Process, Queue
import sys
def process_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
# queue for communicating between parent and child processes
q = Queue()
def func_to_q(_q: Queue, *_args, **_kwargs):
# do the same as func, but put result into the queue. Also put
# there an exception if any.
try:
_res = func(*_args, **_kwargs)
_q.put(_res)
except:
_q.put(sys.exc_info())
# start another process and wait for it to join
p = Process(target=func_to_q, args=(q, )+args, kwargs=kwargs)
p.start()
p.join()
# get result from the queue and return it, or raise if it's an exception
res = q.get(False)
if isinstance(res, tuple) and isinstance(res[0], Exception):
raise res[1].with_traceback(res[2])
else:
return res
return wrapper
if __name__ == '__main__':
@process_wrapper
def ok():
return 'ok'
@process_wrapper
def trouble():
def inside():
raise UserWarning
inside()
print(ok())
print(trouble())
我希望结果类似于:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 44, in trouble
inside()
File "/temp.py", line 43, in inside
raise UserWarning
UserWarning
Process finished with exit code 1
但子进程似乎无法将堆栈跟踪放入队列,我得到以下信息:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 26, in wrapper
res = q.get(False)
File "/usr/lib/python3.6/multiprocessing/queues.py", line 107, in get
raise Empty
queue.Empty
Process finished with exit code 1
此外,如果子项仅将异常本身放入队列 _q.put(sys.exc_info()[1])
,则父项从那里获取异常并引发但使用新的堆栈跟踪(注意缺少对 inside()
的调用):
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 28, in wrapper
raise res
UserWarning
Process finished with exit code 1
查看 multiprocessing.pool.py
和 stringification-hack 以向父级发送异常。您可以从那里使用 multiprocessing.pool.ExceptionWithTraceback
。
这些代码足以演示基本原理:
from multiprocessing import Process, Queue
from multiprocessing.pool import ExceptionWithTraceback
def worker(outqueue):
try:
result = (True, 1 / 0) # will raise ZeroDivisionError
except Exception as e:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
outqueue.put(result)
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
success, value = q.get()
p.join()
if success:
print(value)
else:
raise value # raise again
输出:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/...", line 7, in worker
result = (True, 1 / 0) # will raise ZeroDivisionError
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/...", line 23, in <module>
raise value
ZeroDivisionError: division by zero
Process finished with exit code 1
我正在尝试创建一个 python 装饰器,它接受一个带有 args 和 kwargs 的函数,在一个新进程中执行它,关闭它并 returns 无论函数返回什么,包括提高同样的例外,如果有的话。
目前,我的装饰器可以很好地处理函数,如果它们没有引发异常,但无法提供回溯。我如何将它传递回父进程?
from functools import wraps
from multiprocessing import Process, Queue
import sys
def process_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
# queue for communicating between parent and child processes
q = Queue()
def func_to_q(_q: Queue, *_args, **_kwargs):
# do the same as func, but put result into the queue. Also put
# there an exception if any.
try:
_res = func(*_args, **_kwargs)
_q.put(_res)
except:
_q.put(sys.exc_info())
# start another process and wait for it to join
p = Process(target=func_to_q, args=(q, )+args, kwargs=kwargs)
p.start()
p.join()
# get result from the queue and return it, or raise if it's an exception
res = q.get(False)
if isinstance(res, tuple) and isinstance(res[0], Exception):
raise res[1].with_traceback(res[2])
else:
return res
return wrapper
if __name__ == '__main__':
@process_wrapper
def ok():
return 'ok'
@process_wrapper
def trouble():
def inside():
raise UserWarning
inside()
print(ok())
print(trouble())
我希望结果类似于:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 44, in trouble
inside()
File "/temp.py", line 43, in inside
raise UserWarning
UserWarning
Process finished with exit code 1
但子进程似乎无法将堆栈跟踪放入队列,我得到以下信息:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 26, in wrapper
res = q.get(False)
File "/usr/lib/python3.6/multiprocessing/queues.py", line 107, in get
raise Empty
queue.Empty
Process finished with exit code 1
此外,如果子项仅将异常本身放入队列 _q.put(sys.exc_info()[1])
,则父项从那里获取异常并引发但使用新的堆栈跟踪(注意缺少对 inside()
的调用):
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 28, in wrapper
raise res
UserWarning
Process finished with exit code 1
查看 multiprocessing.pool.py
和 stringification-hack 以向父级发送异常。您可以从那里使用 multiprocessing.pool.ExceptionWithTraceback
。
这些代码足以演示基本原理:
from multiprocessing import Process, Queue
from multiprocessing.pool import ExceptionWithTraceback
def worker(outqueue):
try:
result = (True, 1 / 0) # will raise ZeroDivisionError
except Exception as e:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
outqueue.put(result)
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
success, value = q.get()
p.join()
if success:
print(value)
else:
raise value # raise again
输出:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/...", line 7, in worker
result = (True, 1 / 0) # will raise ZeroDivisionError
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/...", line 23, in <module>
raise value
ZeroDivisionError: division by zero
Process finished with exit code 1