在 Python 中使用 click.progressbar 和多处理
Using click.progressbar with multiprocessing in Python
我有一个很大的列表需要处理,这需要一些时间,所以我把它分成 4 个部分,并用一些功能对每个部分进行多处理。 运行 使用 4 个内核仍然需要一些时间,所以我想我会在函数中添加一些进度条,以便它可以告诉我每个处理器在处理列表时的位置。
我的梦想是拥有这样的东西:
erasing close atoms, cpu0 [######..............................] 13%
erasing close atoms, cpu1 [#######.............................] 15%
erasing close atoms, cpu2 [######..............................] 13%
erasing close atoms, cpu3 [######..............................] 14%
随着函数循环的进行,每个条都在移动。但是相反,我得到了连续的流量:
等,填写我的终端window。
这是调用函数的主要 python 脚本:
from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *
prot, cell = readPDB('file')
atoms = vectorCalc(cell)
output = mp.Queue()
# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = [tasks / cpuNum for i in range(cpuNum)]
for i in range(tasks % cpuNum):
rangeSet[i] += 1
rangeSet = np.array(rangeSet)
processes = []
for c in range(cpuNum):
na, nb = (int(np.sum(rangeSet[:c] + 1)), int(np.sum(rangeSet[:c + 1])))
processes.append(mp.Process(target=eraseCloseAtoms, args=(prot, atoms[na:nb], cell, 2.7, 2.5, output)))
for p in processes:
p.start()
results = [output.get() for p in processes]
for p in processes:
p.join()
atomsNew = results[0] + results[1] + results[2] + results[3]
下面是函数eraseCloseAtoms()
:
import numpy as np
import click
def eraseCloseAtoms(protein, atoms, cell, spacing=2, dmin=1.4, output=None):
print 'just need to erase close atoms'
if dmin > spacing:
print 'the spacing needs to be larger than dmin'
return
grid = [int(cell[0] / spacing), int(cell[1] / spacing), int(cell[2] / spacing)]
selected = list(atoms)
with click.progressbar(length=len(atoms), label='erasing close atoms') as bar:
for i, atom in enumerate(atoms):
bar.update(i)
erased = False
coord = np.array(atom[6])
for ix in [-1, 0, 1]:
if erased:
break
for iy in [-1, 0, 1]:
if erased:
break
for iz in [-1, 0, 1]:
if erased:
break
for j in protein:
protCoord = np.array(protein[int(j)][6])
trueDist = getMinDist(protCoord, coord, cell, vectors)
if trueDist <= dmin:
selected.remove(atom)
erased = True
break
if output is None:
return selected
else:
output.put(selected)
我发现你的代码中有两个问题。
第一个解释了为什么您的进度条经常显示 100%
而不是实际进度。当我认为您想更新一步时,您正在调用 bar.update(i)
将栏的进度推进 i
步。更好的方法是将可迭代对象传递给 progressbar
函数并让它自动进行更新:
with click.progressbar(atoms, label='erasing close atoms') as bar:
for atom in bar:
erased = False
coord = np.array(atom[6])
# ...
但是,这仍然不适用于同时迭代的多个进程,由于您的代码存在第二个问题,每个进程都有自己的进度条。 click.progressbar
documentation 声明以下限制:
No printing must happen or the progress bar will be unintentionally destroyed.
这意味着每当您的进度条之一更新自身时,它将打破所有其他活动进度条。
我认为没有简单的解决方法。交互式更新多行控制台输出非常困难(您基本上需要使用 curses 或类似的 "console GUI" 库,并得到 OS 的支持)。 click
模块没有那个能力,它只能更新当前行。您最大的希望可能是扩展 click.progressbar
设计以在列中输出多个条,例如:
CPU1: [###### ] 52% CPU2: [### ] 30% CPU3: [######## ] 84%
这需要大量代码才能使其正常工作(尤其是当更新来自多个进程时),但这并非完全不切实际。
接受的答案说点击是不可能的,它需要 'non trivial amount of code to make it work'。
虽然这是真的,但还有另一个模块具有开箱即用的此功能:tqdm
https://github.com/tqdm/tqdm 这正是您所需要的。
您可以在文档中做嵌套进度条https://github.com/tqdm/tqdm#nested-progress-bars等
可能和你的梦想不一样,但是你可以使用imap_unordered
和click.progressbar
来集成多处理。
import multiprocessing as mp
import click
import time
def proc(arg):
time.sleep(arg)
return True
def main():
p = mp.Pool(4)
args = range(4)
results = p.imap_unordered(proc, args)
with click.progressbar(results, length=len(args)) as bar:
for result in bar:
pass
if __name__ == '__main__:
main()
如果您不介意只有一个进度条,那么类似这样的方法会起作用:
import click
import threading
import numpy as np
reallybiglist = []
numthreads = 4
def myfunc(listportion, bar):
for item in listportion:
# do a thing
bar.update(1)
with click.progressbar(length=len(reallybiglist), show_pos=True) as bar:
threads = []
for listportion in np.split(reallybiglist, numthreads):
thread = threading.Thread(target=myfunc, args=(listportion, bar))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
对于任何稍后来到这里的人。我创建了这个似乎工作正常。它相当少地覆盖了 click.ProgressBar
,尽管我不得不在方法底部只为几行代码覆盖整个方法。这是在重写之前使用 \x1b[1A\x1b[2K
清除进度条,因此可能取决于环境。
#!/usr/bin/env python
import time
from typing import Dict
import click
from click._termui_impl import ProgressBar as ClickProgressBar, BEFORE_BAR
from click._compat import term_len
class ProgressBar(ClickProgressBar):
def render_progress(self, in_collection=False):
# This is basically a copy of the default render_progress with the addition of in_collection
# param which is only used at the very bottom to determine how to echo the bar
from click.termui import get_terminal_size
if self.is_hidden:
return
buf = []
# Update width in case the terminal has been resized
if self.autowidth:
old_width = self.width
self.width = 0
clutter_length = term_len(self.format_progress_line())
new_width = max(0, get_terminal_size()[0] - clutter_length)
if new_width < old_width:
buf.append(BEFORE_BAR)
buf.append(" " * self.max_width)
self.max_width = new_width
self.width = new_width
clear_width = self.width
if self.max_width is not None:
clear_width = self.max_width
buf.append(BEFORE_BAR)
line = self.format_progress_line()
line_len = term_len(line)
if self.max_width is None or self.max_width < line_len:
self.max_width = line_len
buf.append(line)
buf.append(" " * (clear_width - line_len))
line = "".join(buf)
# Render the line only if it changed.
if line != self._last_line and not self.is_fast():
self._last_line = line
click.echo(line, file=self.file, color=self.color, nl=in_collection)
self.file.flush()
elif in_collection:
click.echo(self._last_line, file=self.file, color=self.color, nl=in_collection)
self.file.flush()
class ProgressBarCollection(object):
def __init__(self, bars: Dict[str, ProgressBar], bar_template=None, width=None):
self.bars = bars
if bar_template or width:
for bar in self.bars.values():
if bar_template:
bar.bar_template = bar_template
if width:
bar.width = width
def __enter__(self):
self.render_progress()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.render_finish()
def render_progress(self, clear=False):
if clear:
self._clear_bars()
for bar in self.bars.values():
bar.render_progress(in_collection=True)
def render_finish(self):
for bar in self.bars.values():
bar.render_finish()
def update(self, bar_name: str, n_steps: int):
self.bars[bar_name].make_step(n_steps)
self.render_progress(clear=True)
def _clear_bars(self):
for _ in range(0, len(self.bars)):
click.echo('\x1b[1A\x1b[2K', nl=False)
def progressbar_collection(bars: Dict[str, ProgressBar]):
return ProgressBarCollection(bars, bar_template="%(label)s [%(bar)s] %(info)s", width=36)
@click.command()
def cli():
with click.progressbar(length=10, label='bar 0') as bar:
for i in range(0, 10):
time.sleep(1)
bar.update(1)
click.echo('------')
with ProgressBar(iterable=None, length=10, label='bar 1', bar_template="%(label)s [%(bar)s] %(info)s") as bar:
for i in range(0, 10):
time.sleep(1)
bar.update(1)
click.echo('------')
bar2 = ProgressBar(iterable=None, length=10, label='bar 2')
bar3 = ProgressBar(iterable=None, length=10, label='bar 3')
with progressbar_collection({'bar2': bar2, 'bar3': bar3}) as bar_collection:
for i in range(0, 10):
time.sleep(1)
bar_collection.update('bar2', 1)
for i in range(0, 10):
time.sleep(1)
bar_collection.update('bar3', 1)
if __name__ == "__main__":
cli()
我有一个很大的列表需要处理,这需要一些时间,所以我把它分成 4 个部分,并用一些功能对每个部分进行多处理。 运行 使用 4 个内核仍然需要一些时间,所以我想我会在函数中添加一些进度条,以便它可以告诉我每个处理器在处理列表时的位置。
我的梦想是拥有这样的东西:
erasing close atoms, cpu0 [######..............................] 13%
erasing close atoms, cpu1 [#######.............................] 15%
erasing close atoms, cpu2 [######..............................] 13%
erasing close atoms, cpu3 [######..............................] 14%
随着函数循环的进行,每个条都在移动。但是相反,我得到了连续的流量:
等,填写我的终端window。
这是调用函数的主要 python 脚本:
from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *
prot, cell = readPDB('file')
atoms = vectorCalc(cell)
output = mp.Queue()
# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = [tasks / cpuNum for i in range(cpuNum)]
for i in range(tasks % cpuNum):
rangeSet[i] += 1
rangeSet = np.array(rangeSet)
processes = []
for c in range(cpuNum):
na, nb = (int(np.sum(rangeSet[:c] + 1)), int(np.sum(rangeSet[:c + 1])))
processes.append(mp.Process(target=eraseCloseAtoms, args=(prot, atoms[na:nb], cell, 2.7, 2.5, output)))
for p in processes:
p.start()
results = [output.get() for p in processes]
for p in processes:
p.join()
atomsNew = results[0] + results[1] + results[2] + results[3]
下面是函数eraseCloseAtoms()
:
import numpy as np
import click
def eraseCloseAtoms(protein, atoms, cell, spacing=2, dmin=1.4, output=None):
print 'just need to erase close atoms'
if dmin > spacing:
print 'the spacing needs to be larger than dmin'
return
grid = [int(cell[0] / spacing), int(cell[1] / spacing), int(cell[2] / spacing)]
selected = list(atoms)
with click.progressbar(length=len(atoms), label='erasing close atoms') as bar:
for i, atom in enumerate(atoms):
bar.update(i)
erased = False
coord = np.array(atom[6])
for ix in [-1, 0, 1]:
if erased:
break
for iy in [-1, 0, 1]:
if erased:
break
for iz in [-1, 0, 1]:
if erased:
break
for j in protein:
protCoord = np.array(protein[int(j)][6])
trueDist = getMinDist(protCoord, coord, cell, vectors)
if trueDist <= dmin:
selected.remove(atom)
erased = True
break
if output is None:
return selected
else:
output.put(selected)
我发现你的代码中有两个问题。
第一个解释了为什么您的进度条经常显示 100%
而不是实际进度。当我认为您想更新一步时,您正在调用 bar.update(i)
将栏的进度推进 i
步。更好的方法是将可迭代对象传递给 progressbar
函数并让它自动进行更新:
with click.progressbar(atoms, label='erasing close atoms') as bar:
for atom in bar:
erased = False
coord = np.array(atom[6])
# ...
但是,这仍然不适用于同时迭代的多个进程,由于您的代码存在第二个问题,每个进程都有自己的进度条。 click.progressbar
documentation 声明以下限制:
No printing must happen or the progress bar will be unintentionally destroyed.
这意味着每当您的进度条之一更新自身时,它将打破所有其他活动进度条。
我认为没有简单的解决方法。交互式更新多行控制台输出非常困难(您基本上需要使用 curses 或类似的 "console GUI" 库,并得到 OS 的支持)。 click
模块没有那个能力,它只能更新当前行。您最大的希望可能是扩展 click.progressbar
设计以在列中输出多个条,例如:
CPU1: [###### ] 52% CPU2: [### ] 30% CPU3: [######## ] 84%
这需要大量代码才能使其正常工作(尤其是当更新来自多个进程时),但这并非完全不切实际。
接受的答案说点击是不可能的,它需要 'non trivial amount of code to make it work'。
虽然这是真的,但还有另一个模块具有开箱即用的此功能:tqdm https://github.com/tqdm/tqdm 这正是您所需要的。
您可以在文档中做嵌套进度条https://github.com/tqdm/tqdm#nested-progress-bars等
可能和你的梦想不一样,但是你可以使用imap_unordered
和click.progressbar
来集成多处理。
import multiprocessing as mp
import click
import time
def proc(arg):
time.sleep(arg)
return True
def main():
p = mp.Pool(4)
args = range(4)
results = p.imap_unordered(proc, args)
with click.progressbar(results, length=len(args)) as bar:
for result in bar:
pass
if __name__ == '__main__:
main()
如果您不介意只有一个进度条,那么类似这样的方法会起作用:
import click
import threading
import numpy as np
reallybiglist = []
numthreads = 4
def myfunc(listportion, bar):
for item in listportion:
# do a thing
bar.update(1)
with click.progressbar(length=len(reallybiglist), show_pos=True) as bar:
threads = []
for listportion in np.split(reallybiglist, numthreads):
thread = threading.Thread(target=myfunc, args=(listportion, bar))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
对于任何稍后来到这里的人。我创建了这个似乎工作正常。它相当少地覆盖了 click.ProgressBar
,尽管我不得不在方法底部只为几行代码覆盖整个方法。这是在重写之前使用 \x1b[1A\x1b[2K
清除进度条,因此可能取决于环境。
#!/usr/bin/env python
import time
from typing import Dict
import click
from click._termui_impl import ProgressBar as ClickProgressBar, BEFORE_BAR
from click._compat import term_len
class ProgressBar(ClickProgressBar):
def render_progress(self, in_collection=False):
# This is basically a copy of the default render_progress with the addition of in_collection
# param which is only used at the very bottom to determine how to echo the bar
from click.termui import get_terminal_size
if self.is_hidden:
return
buf = []
# Update width in case the terminal has been resized
if self.autowidth:
old_width = self.width
self.width = 0
clutter_length = term_len(self.format_progress_line())
new_width = max(0, get_terminal_size()[0] - clutter_length)
if new_width < old_width:
buf.append(BEFORE_BAR)
buf.append(" " * self.max_width)
self.max_width = new_width
self.width = new_width
clear_width = self.width
if self.max_width is not None:
clear_width = self.max_width
buf.append(BEFORE_BAR)
line = self.format_progress_line()
line_len = term_len(line)
if self.max_width is None or self.max_width < line_len:
self.max_width = line_len
buf.append(line)
buf.append(" " * (clear_width - line_len))
line = "".join(buf)
# Render the line only if it changed.
if line != self._last_line and not self.is_fast():
self._last_line = line
click.echo(line, file=self.file, color=self.color, nl=in_collection)
self.file.flush()
elif in_collection:
click.echo(self._last_line, file=self.file, color=self.color, nl=in_collection)
self.file.flush()
class ProgressBarCollection(object):
def __init__(self, bars: Dict[str, ProgressBar], bar_template=None, width=None):
self.bars = bars
if bar_template or width:
for bar in self.bars.values():
if bar_template:
bar.bar_template = bar_template
if width:
bar.width = width
def __enter__(self):
self.render_progress()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.render_finish()
def render_progress(self, clear=False):
if clear:
self._clear_bars()
for bar in self.bars.values():
bar.render_progress(in_collection=True)
def render_finish(self):
for bar in self.bars.values():
bar.render_finish()
def update(self, bar_name: str, n_steps: int):
self.bars[bar_name].make_step(n_steps)
self.render_progress(clear=True)
def _clear_bars(self):
for _ in range(0, len(self.bars)):
click.echo('\x1b[1A\x1b[2K', nl=False)
def progressbar_collection(bars: Dict[str, ProgressBar]):
return ProgressBarCollection(bars, bar_template="%(label)s [%(bar)s] %(info)s", width=36)
@click.command()
def cli():
with click.progressbar(length=10, label='bar 0') as bar:
for i in range(0, 10):
time.sleep(1)
bar.update(1)
click.echo('------')
with ProgressBar(iterable=None, length=10, label='bar 1', bar_template="%(label)s [%(bar)s] %(info)s") as bar:
for i in range(0, 10):
time.sleep(1)
bar.update(1)
click.echo('------')
bar2 = ProgressBar(iterable=None, length=10, label='bar 2')
bar3 = ProgressBar(iterable=None, length=10, label='bar 3')
with progressbar_collection({'bar2': bar2, 'bar3': bar3}) as bar_collection:
for i in range(0, 10):
time.sleep(1)
bar_collection.update('bar2', 1)
for i in range(0, 10):
time.sleep(1)
bar_collection.update('bar3', 1)
if __name__ == "__main__":
cli()