Python 使用多进程重定向标准输出(进度条)
Python redirect stdout (progressbar) using multiprocess
我对进度条模块非常满意,我经常将它与 StdOut 重定向功能一起使用。最近,我开始使用 (pathos) 多处理,但我无法将两者结合起来工作。
我也有一些键盘中断的问题,我读到这是由a bug in Python2引起的。我添加了我用来处理的代码,以防它与这个问题相关。
此外,我注意到使用不同的地图函数可以解决很多问题。我正在使用 imap
因为我想将中间结果写入 csv 文件,当然还要显示进度条。
我自己尝试了 StdOut,并尝试了互联网上的一些建议。然而,我总是在两种不愉快的情况下结束。
或者:
- StdOut 未被重定向,进度条在每个打印语句后重复。
- StdOut 被重定向,但未显示 worker 的输出。
下面是演示我的问题的一些玩具代码:
import time, signal, multiprocessing
import progressbar
def do_work(number):
if not number % 500:
print 'Special log occasion ...'
time.sleep(0.1)
def example(redirect_stdout):
workers = multiprocessing.cpu_count()
num_tasks = 1000
pbar = progressbar.ProgressBar(widgets=[progressbar.Bar()], max_value=num_tasks, redirect_stdout=redirect_stdout)
pbar.start()
# Start a with SIGINT turned of, so that workers can be interrupted
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = multiprocessing.Pool(processes=workers)
signal.signal(signal.SIGINT, original_sigint_handler)
for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
pbar.update(i)
pool.close()
pool.join()
pbar.finish()
print "Case1: Progressbar without redirecting output:"
example(False)
print "\nCase1: Progressbar without redirecting output:"
example(True)
输出:
Case1: Progresspar without redirecing output:
Special log occasion ...
|###################### |
Special log occasion ...
|#############################################|
Case2: Progresspar with redirecing output:
|#############################################|
使用多个进程写入同一个输出流总是容易出现同步问题,或者更糟的是,overwritten/missing 数据。幸运的是,解决这个问题并不难 :)
# vim: set fileencoding=utf-8
import six
import sys
import time
import signal
import multiprocessing
import progressbar
def do_work(number):
if not number % 50:
print 'Special log occasion ...'
sys.stdout.flush()
time.sleep(0.1)
class IOQueue(six.StringIO):
'''
Very poor and simple IO wrapper which only functions for simple print
statements
'''
def __init__(self, queue, *args, **kwargs):
six.StringIO.__init__(self, *args, **kwargs)
self.queue = queue
def write(self, value):
self.queue.put(value)
def example(redirect_stdout):
workers = multiprocessing.cpu_count()
num_tasks = 1000
pbar = progressbar.ProgressBar(
widgets=[progressbar.Bar()],
max_value=num_tasks,
redirect_stdout=redirect_stdout,
)
# Start a with SIGINT turned of, so that workers can be interrupted
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
stdout_queue = multiprocessing.Queue()
def initializer(queue):
sys.stdout = IOQueue(queue)
pool = multiprocessing.Pool(
processes=workers, initializer=initializer, initargs=[stdout_queue])
signal.signal(signal.SIGINT, original_sigint_handler)
for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
while not stdout_queue.empty():
sys.stdout.write(stdout_queue.get())
pbar.update(i)
pool.close()
pool.join()
pbar.finish()
example(True)
上面的代码让所有工作人员将标准输出数据写入多处理队列,该队列在更新进度条之前写入常规标准输出。
我对进度条模块非常满意,我经常将它与 StdOut 重定向功能一起使用。最近,我开始使用 (pathos) 多处理,但我无法将两者结合起来工作。
我也有一些键盘中断的问题,我读到这是由a bug in Python2引起的。我添加了我用来处理的代码,以防它与这个问题相关。
此外,我注意到使用不同的地图函数可以解决很多问题。我正在使用 imap
因为我想将中间结果写入 csv 文件,当然还要显示进度条。
我自己尝试了 StdOut,并尝试了互联网上的一些建议。然而,我总是在两种不愉快的情况下结束。
或者:
- StdOut 未被重定向,进度条在每个打印语句后重复。
- StdOut 被重定向,但未显示 worker 的输出。
下面是演示我的问题的一些玩具代码:
import time, signal, multiprocessing
import progressbar
def do_work(number):
if not number % 500:
print 'Special log occasion ...'
time.sleep(0.1)
def example(redirect_stdout):
workers = multiprocessing.cpu_count()
num_tasks = 1000
pbar = progressbar.ProgressBar(widgets=[progressbar.Bar()], max_value=num_tasks, redirect_stdout=redirect_stdout)
pbar.start()
# Start a with SIGINT turned of, so that workers can be interrupted
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = multiprocessing.Pool(processes=workers)
signal.signal(signal.SIGINT, original_sigint_handler)
for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
pbar.update(i)
pool.close()
pool.join()
pbar.finish()
print "Case1: Progressbar without redirecting output:"
example(False)
print "\nCase1: Progressbar without redirecting output:"
example(True)
输出:
Case1: Progresspar without redirecing output:
Special log occasion ...
|###################### |
Special log occasion ...
|#############################################|
Case2: Progresspar with redirecing output:
|#############################################|
使用多个进程写入同一个输出流总是容易出现同步问题,或者更糟的是,overwritten/missing 数据。幸运的是,解决这个问题并不难 :)
# vim: set fileencoding=utf-8
import six
import sys
import time
import signal
import multiprocessing
import progressbar
def do_work(number):
if not number % 50:
print 'Special log occasion ...'
sys.stdout.flush()
time.sleep(0.1)
class IOQueue(six.StringIO):
'''
Very poor and simple IO wrapper which only functions for simple print
statements
'''
def __init__(self, queue, *args, **kwargs):
six.StringIO.__init__(self, *args, **kwargs)
self.queue = queue
def write(self, value):
self.queue.put(value)
def example(redirect_stdout):
workers = multiprocessing.cpu_count()
num_tasks = 1000
pbar = progressbar.ProgressBar(
widgets=[progressbar.Bar()],
max_value=num_tasks,
redirect_stdout=redirect_stdout,
)
# Start a with SIGINT turned of, so that workers can be interrupted
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
stdout_queue = multiprocessing.Queue()
def initializer(queue):
sys.stdout = IOQueue(queue)
pool = multiprocessing.Pool(
processes=workers, initializer=initializer, initargs=[stdout_queue])
signal.signal(signal.SIGINT, original_sigint_handler)
for i, _ in enumerate(pool.imap(do_work, xrange(num_tasks)), 1):
while not stdout_queue.empty():
sys.stdout.write(stdout_queue.get())
pbar.update(i)
pool.close()
pool.join()
pbar.finish()
example(True)
上面的代码让所有工作人员将标准输出数据写入多处理队列,该队列在更新进度条之前写入常规标准输出。