不要使用 Pool python 打印堆栈跟踪
Do not print stack-trace using Pool python
我同时使用 Pool
到 运行 多个命令。我不想在用户中断脚本时打印堆栈跟踪。
这是我的脚本结构:
def worker(some_element):
try:
cmd_res = Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE).communicate()
except (KeyboardInterrupt, SystemExit):
pass
except Exception, e:
print str(e)
return
#deal with cmd_res...
pool = Pool()
try:
pool.map(worker, some_list, chunksize = 1)
except KeyboardInterrupt:
pool.terminate()
print 'bye!'
通过在 KeyboardInterrupt
引发时调用 pool.terminated()
,我预计不会打印堆栈跟踪,但它不起作用,我 有时 类似于:
^CProcess PoolWorker-6:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
racquire()
KeyboardInterrupt
Process PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
...
bye!
你知道我如何隐藏这个吗?
谢谢。
您的子进程将收到 KeyboardInterrupt 异常和来自 terminate()
的异常。
因为子进程接收到 KeyboardInterrupt,父进程中的一个简单的 join()
——而不是 terminate()
——就足够了。
当您实例化 Pool
时,它会创建 cpu_count()
(在我的机器上,8)python 个进程等待您的 worker()
。请注意,他们还没有 运行,他们正在等待命令。当他们不执行您的代码时,他们也不会处理 KeyboardInterrupt。如果您指定 Pool(processes=2)
并发送中断,您可以看到他们在做什么。您可以使用进程号来修复它,但我认为您无法在所有情况下处理它。
我个人不建议使用 multiprocessing.Pool 来启动其他进程。为此启动几个 python 进程太过分了。更有效的方法是使用线程(参见 threading.Thread
、Queue.Queue
)。但在这种情况下,您需要自己实现线程池。不过这并不难。
根据 y0prst 的建议,我使用 threading.Thread
而不是 Pool
。
这是一个工作示例,它使用 ImageMagick 栅格化一组矢量(我知道我可以为此使用 mogrify
,这只是一个示例)。
#!/usr/bin/python
from os.path import abspath
from os import listdir
from threading import Thread
from subprocess import Popen, PIPE
RASTERISE_CALL = "magick %s %s"
INPUT_DIR = './tests_in/'
def get_vectors(dir):
'''Return a list of svg files inside the `dir` directory'''
return [abspath(dir+f).replace(' ', '\ ') for f in listdir(dir) if f.endswith('.svg')]
class ImageMagickError(Exception):
'''Custom error for ImageMagick fails calls'''
def __init__(self, value): self.value = value
def __str__(self): return repr(self.value)
class Rasterise(Thread):
'''Rasterizes a given vector.'''
def __init__(self, svg):
self.stdout = None
self.stderr = None
Thread.__init__(self)
self.svg = svg
def run(self):
p = Popen((RASTERISE_CALL % (self.svg, self.svg + '.png')).split(), shell=False, stdout=PIPE, stderr=PIPE)
self.stdout, self.stderr = p.communicate()
if self.stderr is not '':
raise ImageMagickError, 'can not rasterize ' + self.svg + ': ' + self.stderr
threads = []
def join_threads():
'''Joins all the threads.'''
for t in threads:
try:
t.join()
except(KeyboardInterrupt, SystemExit):
pass
#Rasterizes all the vectors in INPUT_DIR.
for f in get_vectors(INPUT_DIR):
t = Rasterise(f)
try:
print 'rasterize ' + f
t.start()
except (KeyboardInterrupt, SystemExit):
join_threads()
except ImageMagickError:
print 'Opps, IM can not rasterize ' + f + '.'
continue
threads.append(t)
# wait for all threads to end
join_threads()
print ('Finished!')
请告诉我您是否认为有更 pythonic 的方法来做到这一点,或者如果它可以优化,我将编辑我的答案。
在您的情况下,您甚至不需要池进程或线程。然后使用 try-catch 使 KeyboardInterrupts 静音变得更容易。
当您的 Python 代码执行可以从并行化中获益的 CPU 消耗计算时,池进程很有用。
当您的 Python 代码执行可以并行 运行 的复杂阻塞 I/O 时,线程很有用。您只想并行执行多个程序并等待结果。当您使用 Pool
时,您创建的进程除了启动其他进程并等待它们终止外什么都不做。
最简单的解决方案是并行创建所有进程,然后对每个进程调用 .communicate()
:
try:
processes = []
# Start all processes at once
for element in some_list:
processes.append(Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE))
# Fetch their results sequentially
for process in processes:
cmd_res = process.communicate()
# Process your result here
except KeyboardInterrupt:
for process in processes:
try:
process.terminate()
except OSError:
pass
这适用于 STDOUT 和 STDERR 上的输出不太大的情况。否则,当 communicate()
之外的另一个进程当前 运行ning 为 PIPE 缓冲区产生过多输出(通常约为 1-8 kB)时,它将被 OS 挂起,直到 communicate()
在挂起的进程上调用。在那种情况下,您需要一个更复杂的解决方案:
异步I/O
从Python 3.4开始你可以使用asyncio
模块进行单线程伪多线程:
import asyncio
from asyncio.subprocess import PIPE
loop = asyncio.get_event_loop()
@asyncio.coroutine
def worker(some_element):
process = yield from asyncio.create_subprocess_exec(*SOME_COMMAND, stdout=PIPE)
try:
cmd_res = yield from process.communicate()
except KeyboardInterrupt:
process.terminate()
return
try:
pass # Process your result here
except KeyboardInterrupt:
return
# Start all workers
workers = []
for element in some_list:
w = worker(element)
workers.append(w)
asyncio.async(w)
# Run until everything complete
loop.run_until_complete(asyncio.wait(workers))
您应该能够使用例如限制并发进程的数量asyncio.Semaphore
如果需要的话。
我同时使用 Pool
到 运行 多个命令。我不想在用户中断脚本时打印堆栈跟踪。
这是我的脚本结构:
def worker(some_element):
try:
cmd_res = Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE).communicate()
except (KeyboardInterrupt, SystemExit):
pass
except Exception, e:
print str(e)
return
#deal with cmd_res...
pool = Pool()
try:
pool.map(worker, some_list, chunksize = 1)
except KeyboardInterrupt:
pool.terminate()
print 'bye!'
通过在 KeyboardInterrupt
引发时调用 pool.terminated()
,我预计不会打印堆栈跟踪,但它不起作用,我 有时 类似于:
^CProcess PoolWorker-6:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
racquire()
KeyboardInterrupt
Process PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):
...
bye!
你知道我如何隐藏这个吗?
谢谢。
您的子进程将收到 KeyboardInterrupt 异常和来自 terminate()
的异常。
因为子进程接收到 KeyboardInterrupt,父进程中的一个简单的 join()
——而不是 terminate()
——就足够了。
当您实例化 Pool
时,它会创建 cpu_count()
(在我的机器上,8)python 个进程等待您的 worker()
。请注意,他们还没有 运行,他们正在等待命令。当他们不执行您的代码时,他们也不会处理 KeyboardInterrupt。如果您指定 Pool(processes=2)
并发送中断,您可以看到他们在做什么。您可以使用进程号来修复它,但我认为您无法在所有情况下处理它。
我个人不建议使用 multiprocessing.Pool 来启动其他进程。为此启动几个 python 进程太过分了。更有效的方法是使用线程(参见 threading.Thread
、Queue.Queue
)。但在这种情况下,您需要自己实现线程池。不过这并不难。
根据 y0prst 的建议,我使用 threading.Thread
而不是 Pool
。
这是一个工作示例,它使用 ImageMagick 栅格化一组矢量(我知道我可以为此使用 mogrify
,这只是一个示例)。
#!/usr/bin/python
from os.path import abspath
from os import listdir
from threading import Thread
from subprocess import Popen, PIPE
RASTERISE_CALL = "magick %s %s"
INPUT_DIR = './tests_in/'
def get_vectors(dir):
'''Return a list of svg files inside the `dir` directory'''
return [abspath(dir+f).replace(' ', '\ ') for f in listdir(dir) if f.endswith('.svg')]
class ImageMagickError(Exception):
'''Custom error for ImageMagick fails calls'''
def __init__(self, value): self.value = value
def __str__(self): return repr(self.value)
class Rasterise(Thread):
'''Rasterizes a given vector.'''
def __init__(self, svg):
self.stdout = None
self.stderr = None
Thread.__init__(self)
self.svg = svg
def run(self):
p = Popen((RASTERISE_CALL % (self.svg, self.svg + '.png')).split(), shell=False, stdout=PIPE, stderr=PIPE)
self.stdout, self.stderr = p.communicate()
if self.stderr is not '':
raise ImageMagickError, 'can not rasterize ' + self.svg + ': ' + self.stderr
threads = []
def join_threads():
'''Joins all the threads.'''
for t in threads:
try:
t.join()
except(KeyboardInterrupt, SystemExit):
pass
#Rasterizes all the vectors in INPUT_DIR.
for f in get_vectors(INPUT_DIR):
t = Rasterise(f)
try:
print 'rasterize ' + f
t.start()
except (KeyboardInterrupt, SystemExit):
join_threads()
except ImageMagickError:
print 'Opps, IM can not rasterize ' + f + '.'
continue
threads.append(t)
# wait for all threads to end
join_threads()
print ('Finished!')
请告诉我您是否认为有更 pythonic 的方法来做到这一点,或者如果它可以优化,我将编辑我的答案。
在您的情况下,您甚至不需要池进程或线程。然后使用 try-catch 使 KeyboardInterrupts 静音变得更容易。
当您的 Python 代码执行可以从并行化中获益的 CPU 消耗计算时,池进程很有用。
当您的 Python 代码执行可以并行 运行 的复杂阻塞 I/O 时,线程很有用。您只想并行执行多个程序并等待结果。当您使用 Pool
时,您创建的进程除了启动其他进程并等待它们终止外什么都不做。
最简单的解决方案是并行创建所有进程,然后对每个进程调用 .communicate()
:
try:
processes = []
# Start all processes at once
for element in some_list:
processes.append(Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE))
# Fetch their results sequentially
for process in processes:
cmd_res = process.communicate()
# Process your result here
except KeyboardInterrupt:
for process in processes:
try:
process.terminate()
except OSError:
pass
这适用于 STDOUT 和 STDERR 上的输出不太大的情况。否则,当 communicate()
之外的另一个进程当前 运行ning 为 PIPE 缓冲区产生过多输出(通常约为 1-8 kB)时,它将被 OS 挂起,直到 communicate()
在挂起的进程上调用。在那种情况下,您需要一个更复杂的解决方案:
异步I/O
从Python 3.4开始你可以使用asyncio
模块进行单线程伪多线程:
import asyncio
from asyncio.subprocess import PIPE
loop = asyncio.get_event_loop()
@asyncio.coroutine
def worker(some_element):
process = yield from asyncio.create_subprocess_exec(*SOME_COMMAND, stdout=PIPE)
try:
cmd_res = yield from process.communicate()
except KeyboardInterrupt:
process.terminate()
return
try:
pass # Process your result here
except KeyboardInterrupt:
return
# Start all workers
workers = []
for element in some_list:
w = worker(element)
workers.append(w)
asyncio.async(w)
# Run until everything complete
loop.run_until_complete(asyncio.wait(workers))
您应该能够使用例如限制并发进程的数量asyncio.Semaphore
如果需要的话。