如何在命令行中为池进程创建进度条?
How to create a progress bar in command line for pool processes?
我有几个脚本,我 运行 使用多处理池 我正在尝试根据已完成的脚本制作一个进度条。
我查了
但我不知道如何组合在柜台完成的脚本
import os
from multiprocessing import Pool
def run_process(process):
os.system('python {}'.format(process))
processes = ('script1.py', 'script2.py','script3.py','script4.py')
if __name__ == "__main__":
pool = Pool(processes=2)
pool.map(run_process, processes)
您可以使用 pool.apply_async()
来完成,因为它支持回调函数,可用于了解目标函数何时返回。
我使用@Greenstick 的 answer 来显示进度条,但我主要修改它以符合 PEP-8 编码指南,并将它放在一个名为 print_progress_bar
的单独模块中 — 见下文。
性能说明: 虽然 可以 使用 multiprocessing.Pool
来做到这一点——我强烈怀疑你问题中的代码是文章 How to run parallel processes 中内容的逐字副本——这样做效率极低,因为每个进程都会初始化自己的 Python 解释器 double 真正需要的次数。首先执行 run_process()
函数本身,然后再次执行 运行 脚本过程。
生成过程涉及相当多的开销。这种开销可以通过 运行ning run_process()
作为 current 进程中的单独线程来减轻,这是一个更轻量级的进程。
切换到 ThreadPool
非常简单,只需更改行:
from multiprocessing import Pool
至
from multiprocessing.pool import ThreadPool as Pool
或者您可以使用 concurrent.futures.ThreadPoolExecutor
,如我的 所示。
import os
from multiprocessing import Pool
import subprocess
import sys
from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)
def run_process(process):
os.system('{} {}'.format(sys.executable, process))
def callback(_):
"""Update process count and progress bar."""
global process_count
process_count += 1
print_progress_bar(process_count, len(processes), **progress_bar_kwargs)
# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
'./mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0
if __name__ == '__main__':
print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.
with Pool(processes=2) as pool:
results = []
for process in processes:
r = pool.apply_async(run_process, (process,), {}, callback)
results.append(r)
while results: # Processes still running?
results = [r for r in results if not r.ready()]
print('Done')
print_progress_bar.py
:
# from
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='█', print_end="\r"):
""" Print iterations progress.
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end=print_end, flush=True)
if iteration == total: # Print newline on completion.
print(flush=True)
这里有一个稍微不同的方法,它使用 concurrent.futures.ThreadPoolExecutor
instead of a multiprocessing.Pool
which make it simpler and more efficient than what's in my 。
注意 它使用与我的其他答案中相同的 print_progress_bar.py
模块。
import concurrent.futures
import os
import subprocess
import sys
from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)
# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
'./mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0
def run_process(process):
global process_count
subprocess.run([sys.executable, process])
# Update process count and progress bar when it's done.
process_count += 1
print_progress_bar(process_count, len(processes), **progress_bar_kwargs)
print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_to_process = {executor.submit(run_process, process): process
for process in processes}
for future in concurrent.futures.as_completed(future_to_process):
process = future_to_process[future]
try:
_ = future.result()
except Exception as exc:
print()
print(f'{process} generated an exception: {exc}')
print('Done')
我有几个脚本,我 运行 使用多处理池 我正在尝试根据已完成的脚本制作一个进度条。
我查了
但我不知道如何组合在柜台完成的脚本
import os
from multiprocessing import Pool
def run_process(process):
os.system('python {}'.format(process))
processes = ('script1.py', 'script2.py','script3.py','script4.py')
if __name__ == "__main__":
pool = Pool(processes=2)
pool.map(run_process, processes)
您可以使用 pool.apply_async()
来完成,因为它支持回调函数,可用于了解目标函数何时返回。
我使用@Greenstick 的 answer 来显示进度条,但我主要修改它以符合 PEP-8 编码指南,并将它放在一个名为 print_progress_bar
的单独模块中 — 见下文。
性能说明: 虽然 可以 使用 multiprocessing.Pool
来做到这一点——我强烈怀疑你问题中的代码是文章 How to run parallel processes 中内容的逐字副本——这样做效率极低,因为每个进程都会初始化自己的 Python 解释器 double 真正需要的次数。首先执行 run_process()
函数本身,然后再次执行 运行 脚本过程。
生成过程涉及相当多的开销。这种开销可以通过 运行ning run_process()
作为 current 进程中的单独线程来减轻,这是一个更轻量级的进程。
切换到 ThreadPool
非常简单,只需更改行:
from multiprocessing import Pool
至
from multiprocessing.pool import ThreadPool as Pool
或者您可以使用 concurrent.futures.ThreadPoolExecutor
,如我的
import os
from multiprocessing import Pool
import subprocess
import sys
from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)
def run_process(process):
os.system('{} {}'.format(sys.executable, process))
def callback(_):
"""Update process count and progress bar."""
global process_count
process_count += 1
print_progress_bar(process_count, len(processes), **progress_bar_kwargs)
# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
'./mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0
if __name__ == '__main__':
print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.
with Pool(processes=2) as pool:
results = []
for process in processes:
r = pool.apply_async(run_process, (process,), {}, callback)
results.append(r)
while results: # Processes still running?
results = [r for r in results if not r.ready()]
print('Done')
print_progress_bar.py
:
# from
def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100,
fill='█', print_end="\r"):
""" Print iterations progress.
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
print_end - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end=print_end, flush=True)
if iteration == total: # Print newline on completion.
print(flush=True)
这里有一个稍微不同的方法,它使用 concurrent.futures.ThreadPoolExecutor
instead of a multiprocessing.Pool
which make it simpler and more efficient than what's in my
注意 它使用与我的其他答案中相同的 print_progress_bar.py
模块。
import concurrent.futures
import os
import subprocess
import sys
from print_progress_bar import print_progress_bar
progress_bar_kwargs = dict(prefix='Progress:', suffix='Complete', length=40)
# To simplify testing just using one script multiple times.
processes = ('./mp_scripts/script1.py', './mp_scripts/script1.py',
'./mp_scripts/script1.py', './mp_scripts/script1.py')
process_count = 0
def run_process(process):
global process_count
subprocess.run([sys.executable, process])
# Update process count and progress bar when it's done.
process_count += 1
print_progress_bar(process_count, len(processes), **progress_bar_kwargs)
print_progress_bar(0, len(processes), **progress_bar_kwargs) # Print 0% progress.
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_to_process = {executor.submit(run_process, process): process
for process in processes}
for future in concurrent.futures.as_completed(future_to_process):
process = future_to_process[future]
try:
_ = future.result()
except Exception as exc:
print()
print(f'{process} generated an exception: {exc}')
print('Done')