在 Python 中使用 click.progressbar 和多处理

Using click.progressbar with multiprocessing in Python

我有一个很大的列表需要处理,这需要一些时间,所以我把它分成 4 个部分,并用一些功能对每个部分进行多处理。 运行 使用 4 个内核仍然需要一些时间,所以我想我会在函数中添加一些进度条,以便它可以告诉我每个处理器在处理列表时的位置。

我的梦想是拥有这样的东西:

erasing close atoms, cpu0  [######..............................]  13%
erasing close atoms, cpu1  [#######.............................]  15%
erasing close atoms, cpu2  [######..............................]  13%
erasing close atoms, cpu3  [######..............................]  14%

随着函数循环的进行,每个条都在移动。但是相反,我得到了连续的流量:

等,填写我的终端window。

这是调用函数的主要 python 脚本:

from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *

prot, cell = readPDB('file')
atoms = vectorCalc(cell)

output = mp.Queue()

# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = [tasks / cpuNum for i in range(cpuNum)]
for i in range(tasks % cpuNum):
    rangeSet[i] += 1

rangeSet = np.array(rangeSet)

processes = []
for c in range(cpuNum):
    na, nb = (int(np.sum(rangeSet[:c] + 1)), int(np.sum(rangeSet[:c + 1])))
    processes.append(mp.Process(target=eraseCloseAtoms, args=(prot, atoms[na:nb], cell, 2.7, 2.5, output)))

for p in processes:
    p.start()

results = [output.get() for p in processes]

for p in processes:
    p.join()

atomsNew = results[0] + results[1] + results[2] + results[3]

下面是函数eraseCloseAtoms():

import numpy as np
import click


def eraseCloseAtoms(protein, atoms, cell, spacing=2, dmin=1.4, output=None):
    print 'just need to erase close atoms'

    if dmin > spacing:
        print 'the spacing needs to be larger than dmin'
        return

    grid = [int(cell[0] / spacing), int(cell[1] / spacing), int(cell[2] / spacing)]

    selected = list(atoms)
    with click.progressbar(length=len(atoms), label='erasing close atoms') as bar:
        for i, atom in enumerate(atoms):
            bar.update(i)
            erased = False
            coord = np.array(atom[6])

            for ix in [-1, 0, 1]:
                if erased:
                    break
                for iy in [-1, 0, 1]:
                    if erased:
                        break
                    for iz in [-1, 0, 1]:
                        if erased:
                            break
                        for j in protein:
                            protCoord = np.array(protein[int(j)][6])
                            trueDist = getMinDist(protCoord, coord, cell, vectors)
                            if trueDist <= dmin:
                                selected.remove(atom)
                                erased = True
                                break
    if output is None:
        return selected
    else:
        output.put(selected)

我发现你的代码中有两个问题。

第一个解释了为什么您的进度条经常显示 100% 而不是实际进度。当我认为您想更新一步时,您正在调用 bar.update(i) 将栏的进度推进 i 步。更好的方法是将可迭代对象传递给 progressbar 函数并让它自动进行更新:

with click.progressbar(atoms, label='erasing close atoms') as bar:
    for atom in bar:
        erased = False
        coord = np.array(atom[6])

        # ...

但是,这仍然不适用于同时迭代的多个进程,由于您的代码存在第二个问题,每个进程都有自己的进度条。 click.progressbar documentation 声明以下限制:

No printing must happen or the progress bar will be unintentionally destroyed.

这意味着每当您的进度条之一更新自身时,它将打破所有其他活动进度条。

我认为没有简单的解决方法。交互式更新多行控制台输出非常困难(您基本上需要使用 curses 或类似的 "console GUI" 库,并得到 OS 的支持)。 click 模块没有那个能力,它只能更新当前行。您最大的希望可能是扩展 click.progressbar 设计以在列中输出多个条,例如:

CPU1: [######      ] 52%   CPU2: [###        ] 30%    CPU3: [########  ] 84%

这需要大量代码才能使其正常工作(尤其是当更新来自多个进程时),但这并非完全不切实际。

接受的答案说点击是不可能的,它需要 'non trivial amount of code to make it work'。

虽然这是真的,但还有另一个模块具有开箱即用的此功能:tqdm https://github.com/tqdm/tqdm 这正是您所需要的。

您可以在文档中做嵌套进度条https://github.com/tqdm/tqdm#nested-progress-bars

可能和你的梦想不一样,但是你可以使用imap_unorderedclick.progressbar来集成多处理。

import multiprocessing as mp
import click
import time


def proc(arg):
    time.sleep(arg)
    return True

def main():
    p = mp.Pool(4)
    args = range(4)
    results = p.imap_unordered(proc, args)
    with click.progressbar(results, length=len(args)) as bar:
        for result in bar:
            pass

if __name__ == '__main__:
    main()

如果您不介意只有一个进度条,那么类似这样的方法会起作用:

import click
import threading
import numpy as np

reallybiglist = []
numthreads = 4

def myfunc(listportion, bar):
    for item in listportion:
        # do a thing
        bar.update(1)

with click.progressbar(length=len(reallybiglist), show_pos=True) as bar:
    threads = []
    for listportion in np.split(reallybiglist, numthreads):
        thread = threading.Thread(target=myfunc, args=(listportion, bar))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

对于任何稍后来到这里的人。我创建了这个似乎工作正常。它相当少地覆盖了 click.ProgressBar,尽管我不得不在方法底部只为几行代码覆盖整个方法。这是在重写之前使用 \x1b[1A\x1b[2K 清除进度条,因此可能取决于环境。

#!/usr/bin/env python
import time
from typing import Dict

import click
from click._termui_impl import ProgressBar as ClickProgressBar, BEFORE_BAR
from click._compat import term_len


class ProgressBar(ClickProgressBar):
    def render_progress(self, in_collection=False):
        # This is basically a copy of the default render_progress with the addition of in_collection
        # param which is only used at the very bottom to determine how to echo the bar
        from click.termui import get_terminal_size

        if self.is_hidden:
            return

        buf = []
        # Update width in case the terminal has been resized
        if self.autowidth:
            old_width = self.width
            self.width = 0
            clutter_length = term_len(self.format_progress_line())
            new_width = max(0, get_terminal_size()[0] - clutter_length)
            if new_width < old_width:
                buf.append(BEFORE_BAR)
                buf.append(" " * self.max_width)
                self.max_width = new_width
            self.width = new_width

        clear_width = self.width
        if self.max_width is not None:
            clear_width = self.max_width

        buf.append(BEFORE_BAR)
        line = self.format_progress_line()
        line_len = term_len(line)
        if self.max_width is None or self.max_width < line_len:
            self.max_width = line_len

        buf.append(line)
        buf.append(" " * (clear_width - line_len))
        line = "".join(buf)
        # Render the line only if it changed.

        if line != self._last_line and not self.is_fast():
            self._last_line = line
            click.echo(line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()
        elif in_collection:
            click.echo(self._last_line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()


class ProgressBarCollection(object):
    def __init__(self, bars: Dict[str, ProgressBar], bar_template=None, width=None):
        self.bars = bars
        if bar_template or width:
            for bar in self.bars.values():
                if bar_template:
                    bar.bar_template = bar_template
                if width:
                    bar.width = width

    def __enter__(self):
        self.render_progress()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.render_finish()

    def render_progress(self, clear=False):
        if clear:
            self._clear_bars()
        for bar in self.bars.values():
            bar.render_progress(in_collection=True)

    def render_finish(self):
        for bar in self.bars.values():
            bar.render_finish()

    def update(self, bar_name: str, n_steps: int):
        self.bars[bar_name].make_step(n_steps)
        self.render_progress(clear=True)

    def _clear_bars(self):
        for _ in range(0, len(self.bars)):
            click.echo('\x1b[1A\x1b[2K', nl=False)


def progressbar_collection(bars: Dict[str, ProgressBar]):
    return ProgressBarCollection(bars, bar_template="%(label)s  [%(bar)s]  %(info)s", width=36)


@click.command()
def cli():
    with click.progressbar(length=10, label='bar 0') as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    with ProgressBar(iterable=None, length=10, label='bar 1', bar_template="%(label)s  [%(bar)s]  %(info)s") as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    bar2 = ProgressBar(iterable=None, length=10, label='bar 2')
    bar3 = ProgressBar(iterable=None, length=10, label='bar 3')
    with progressbar_collection({'bar2': bar2, 'bar3': bar3}) as bar_collection:
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar2', 1)
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar3', 1)


if __name__ == "__main__":
    cli()