不要使用 Pool python 打印堆栈跟踪

Do not print stack-trace using Pool python

我同时使用 Pool 到 运行 多个命令。我不想在用户中断脚本时打印堆栈跟踪。

这是我的脚本结构:

def worker(some_element):
    try:
        cmd_res = Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE).communicate()
    except (KeyboardInterrupt, SystemExit):
        pass
    except Exception, e:
        print str(e)
        return

    #deal with cmd_res...

pool = Pool()
try:
    pool.map(worker, some_list, chunksize = 1)
except KeyboardInterrupt:
    pool.terminate()
    print 'bye!'

通过在 KeyboardInterrupt 引发时调用 pool.terminated(),我预计不会打印堆栈跟踪,但它不起作用,我 有时 类似于:

^CProcess PoolWorker-6:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
    racquire()
KeyboardInterrupt
Process PoolWorker-1:
Process PoolWorker-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Traceback (most recent call last):

...
bye!

你知道我如何隐藏这个吗?

谢谢。

您的子进程将收到 KeyboardInterrupt 异常和来自 terminate() 的异常。

因为子进程接收到 KeyboardInterrupt,父进程中的一个简单的 join()——而不是 terminate()——就足够了。

当您实例化 Pool 时,它会创建 cpu_count()(在我的机器上,8)python 个进程等待您的 worker()。请注意,他们还没有 运行,他们正在等待命令。当他们不执行您的代码时,他们也不会处理 KeyboardInterrupt。如果您指定 Pool(processes=2) 并发送中断,您可以看到他们在做什么。您可以使用进程号来修复它,但我认为您无法在所有情况下处理它。

我个人不建议使用 multiprocessing.Pool 来启动其他进程。为此启动几个 python 进程太过分了。更有效的方法是使用线程(参见 threading.ThreadQueue.Queue)。但在这种情况下,您需要自己实现线程池。不过这并不难。

根据 y0prst 的建议,我使用 threading.Thread 而不是 Pool

这是一个工作示例,它使用 ImageMagick 栅格化一组矢量(我知道我可以为此使用 mogrify,这只是一个示例)。

#!/usr/bin/python

from os.path import abspath
from os import listdir
from threading import Thread
from subprocess import Popen, PIPE

RASTERISE_CALL = "magick %s %s"
INPUT_DIR = './tests_in/'

def get_vectors(dir):
    '''Return a list of svg files inside the `dir` directory'''
    return [abspath(dir+f).replace(' ', '\ ') for f in listdir(dir) if f.endswith('.svg')]

class ImageMagickError(Exception):
    '''Custom error for ImageMagick fails calls'''
    def __init__(self, value): self.value = value
    def __str__(self): return repr(self.value)

class Rasterise(Thread):
    '''Rasterizes a given vector.'''
    def __init__(self, svg):
        self.stdout = None
        self.stderr = None
        Thread.__init__(self)
        self.svg = svg

    def run(self):
        p = Popen((RASTERISE_CALL % (self.svg, self.svg + '.png')).split(), shell=False, stdout=PIPE, stderr=PIPE)
        self.stdout, self.stderr = p.communicate()
        if self.stderr is not '':
            raise ImageMagickError, 'can not rasterize ' + self.svg + ': ' + self.stderr

threads = []

def join_threads():
    '''Joins all the threads.'''
    for t in threads:
        try:
            t.join()
        except(KeyboardInterrupt, SystemExit):
            pass

#Rasterizes all the vectors in INPUT_DIR.
for f in get_vectors(INPUT_DIR):
    t = Rasterise(f)

    try:
        print 'rasterize ' + f
        t.start()
    except (KeyboardInterrupt, SystemExit):
        join_threads()
    except ImageMagickError:
        print 'Opps, IM can not rasterize ' + f + '.'
        continue

    threads.append(t)

# wait for all threads to end
join_threads()

print ('Finished!')

请告诉我您是否认为有更 pythonic 的方法来做到这一点,或者如果它可以优化,我将编辑我的答案。

在您的情况下,您甚至不需要池进程或线程。然后使用 try-catch 使 KeyboardInterrupts 静音变得更容易。

当您的 Python 代码执行可以从并行化中获益的 CPU 消耗计算时,池进程很有用。 当您的 Python 代码执行可以并行 运行 的复杂阻塞 I/O 时,线程很有用。您只想并行执行多个程序并等待结果。当您使用 Pool 时,您创建的进程除了启动其他进程并等待它们终止外什么都不做。

最简单的解决方案是并行创建所有进程,然后对每个进程调用 .communicate()

try:
    processes = []
    # Start all processes at once
    for element in some_list:
        processes.append(Popen(SOME_COMMAND, stdout=PIPE, stderr=PIPE))
    # Fetch their results sequentially
    for process in processes:
        cmd_res = process.communicate()
        # Process your result here
except KeyboardInterrupt:
    for process in processes:
        try:
            process.terminate()
        except OSError:
            pass

这适用于 STDOUT 和 STDERR 上的输出不太大的情况。否则,当 communicate() 之外的另一个进程当前 运行ning 为 PIPE 缓冲区产生过多输出(通常约为 1-8 kB)时,它将被 OS 挂起,直到 communicate() 在挂起的进程上调用。在那种情况下,您需要一个更复杂的解决方案:

异步I/O

从Python 3.4开始你可以使用asyncio模块进行单线程伪多线程:

import asyncio
from asyncio.subprocess import PIPE

loop = asyncio.get_event_loop()

@asyncio.coroutine
def worker(some_element):
    process = yield from asyncio.create_subprocess_exec(*SOME_COMMAND, stdout=PIPE)
    try:
        cmd_res = yield from process.communicate()
    except KeyboardInterrupt:
        process.terminate()
        return
    try:
        pass # Process your result here
    except KeyboardInterrupt:
        return

# Start all workers
workers = []
for element in some_list:
    w = worker(element)
    workers.append(w)
    asyncio.async(w)

# Run until everything complete
loop.run_until_complete(asyncio.wait(workers))

您应该能够使用例如限制并发进程的数量asyncio.Semaphore 如果需要的话。