Python 使用多进程重定向标准输出(进度条)

Python redirect stdout (progressbar) using multiprocess

我对进度条模块非常满意,我经常将它与 StdOut 重定向功能一起使用。最近,我开始使用 (pathos) 多处理,但我无法将两者结合起来工作。

我也有一些键盘中断的问题,我读到这是由a bug in Python2引起的。我添加了我用来处理的代码,以防它与这个问题相关。

此外,我注意到使用不同的地图函数可以解决很多问题。我正在使用 imap 因为我想将中间结果写入 csv 文件,当然还要显示进度条。

我自己尝试了 StdOut,并尝试了互联网上的一些建议。然而,我总是在两种不愉快的情况下结束。

或者:

  1. StdOut 未被重定向,进度条在每个打印语句后重复。
  2. StdOut 被重定向,但未显示 worker 的输出。

下面是演示我的问题的一些玩具代码:

import time, signal, multiprocessing
import progressbar


def do_work(number):
    if not number % 500:
        print 'Special log occasion ...'
    time.sleep(0.1)

def example(redirect_stdout):
    workers = multiprocessing.cpu_count()
    num_tasks = 1000
    pbar = progressbar.ProgressBar(widgets=[progressbar.Bar()], max_value=num_tasks, redirect_stdout=redirect_stdout)
    pbar.start()

    # Start a with SIGINT turned of, so that workers can be interrupted
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = multiprocessing.Pool(processes=workers)
    signal.signal(signal.SIGINT, original_sigint_handler)


    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
        pbar.update(i)

    pool.close()
    pool.join()
    pbar.finish()

print "Case1: Progressbar without redirecting output:"
example(False)
print "\nCase1: Progressbar without redirecting output:"
example(True)

输出:

Case1: Progresspar without redirecing output:
Special log occasion ...
|######################                       |
Special log occasion ...
|#############################################|


Case2: Progresspar with redirecing output:
|#############################################|

使用多个进程写入同一个输出流总是容易出现同步问题,或者更糟的是,overwritten/missing 数据。幸运的是,解决这个问题并不难 :)

# vim: set fileencoding=utf-8
import six
import sys
import time
import signal
import multiprocessing
import progressbar


def do_work(number):
    if not number % 50:
        print 'Special log occasion ...'
        sys.stdout.flush()
    time.sleep(0.1)


class IOQueue(six.StringIO):

    '''
    Very poor and simple IO wrapper which only functions for simple print
    statements
    '''

    def __init__(self, queue, *args, **kwargs):
        six.StringIO.__init__(self, *args, **kwargs)
        self.queue = queue

    def write(self, value):
        self.queue.put(value)


def example(redirect_stdout):
    workers = multiprocessing.cpu_count()
    num_tasks = 1000
    pbar = progressbar.ProgressBar(
        widgets=[progressbar.Bar()],
        max_value=num_tasks,
        redirect_stdout=redirect_stdout,
    )
    # Start a with SIGINT turned of, so that workers can be interrupted
    original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

    stdout_queue = multiprocessing.Queue()

    def initializer(queue):
        sys.stdout = IOQueue(queue)

    pool = multiprocessing.Pool(
        processes=workers, initializer=initializer, initargs=[stdout_queue])
    signal.signal(signal.SIGINT, original_sigint_handler)

    for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
        while not stdout_queue.empty():
            sys.stdout.write(stdout_queue.get())

        pbar.update(i)

    pool.close()
    pool.join()
    pbar.finish()

example(True)

上面的代码让所有工作人员将标准输出数据写入多处理队列,该队列在更新进度条之前写入常规标准输出。