Python非循环功能的进度条
Python progress bar for non-loop function
给定一个函数 f
,为 f
编写进度条的最佳方法是什么?即,有一个在 f
执行期间更新的实时进度条。请注意,我无法更改 f
(它是来自另一个库的函数),因此无法在 f
中插入 pbar.update
调用(因此这是一个 post关于非循环函数的进度条)。其他 SO post 解决了这个问题,前提是你可以更改 f
中的代码,但我无法 find/think 解决方案,因为我无法访问f
.
的内容
我是否必须使用线程或多处理才能实现这样的目标?
类似于:
@progress_bar
def func_wrapper(*args, **kwargs):
return f(*args, **kwargs)
或:
start_progress_bar()
f()
感谢任何帮助!
更新:我采用了@Acorn 的回答中提供的代码并以装饰器形式重写了它。
import concurrent.futures
import functools
import time
from tqdm import tqdm
def progress_bar(expected_time, increments=10):
def _progress_bar(func):
def timed_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
@functools.wraps(func)
def _func(*args, **kwargs):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(func, *args, **kwargs)
timed_progress_bar(future, expected_time, increments)
return future.result()
return _func
return _progress_bar
if __name__ == "__main__":
@progress_bar(expected_time=11)
def test_func():
time.sleep(10)
return "result"
print(test_func()) # prints "result"
如果该函数不允许您在工作单元之后采取行动,即通过公开生成器接口或某种回调,那么唯一的解决方案是使用该函数的修改版本,或者做某种猴子补丁。
解决方案将特定于相关代码。
更新:
因此,如果您不介意进度条不能准确反映进度,并且只使用时间估算,您可以这样做。
import concurrent.futures
import time
from tqdm import tqdm
def timed_future_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
def blocking_job():
time.sleep(2)
return 'result'
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(blocking_job)
timed_future_progress_bar(future, 5)
print(f'Work done: {future.result()}')
main()
无论作业花费的时间比预期多还是少,这都应该表现得明智。如果作业运行时间超过预期,则进度将在 90% 处等待,直到完成。
给定一个函数 f
,为 f
编写进度条的最佳方法是什么?即,有一个在 f
执行期间更新的实时进度条。请注意,我无法更改 f
(它是来自另一个库的函数),因此无法在 f
中插入 pbar.update
调用(因此这是一个 post关于非循环函数的进度条)。其他 SO post 解决了这个问题,前提是你可以更改 f
中的代码,但我无法 find/think 解决方案,因为我无法访问f
.
我是否必须使用线程或多处理才能实现这样的目标?
类似于:
@progress_bar
def func_wrapper(*args, **kwargs):
return f(*args, **kwargs)
或:
start_progress_bar()
f()
感谢任何帮助!
更新:我采用了@Acorn 的回答中提供的代码并以装饰器形式重写了它。
import concurrent.futures
import functools
import time
from tqdm import tqdm
def progress_bar(expected_time, increments=10):
def _progress_bar(func):
def timed_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
@functools.wraps(func)
def _func(*args, **kwargs):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(func, *args, **kwargs)
timed_progress_bar(future, expected_time, increments)
return future.result()
return _func
return _progress_bar
if __name__ == "__main__":
@progress_bar(expected_time=11)
def test_func():
time.sleep(10)
return "result"
print(test_func()) # prints "result"
如果该函数不允许您在工作单元之后采取行动,即通过公开生成器接口或某种回调,那么唯一的解决方案是使用该函数的修改版本,或者做某种猴子补丁。
解决方案将特定于相关代码。
更新:
因此,如果您不介意进度条不能准确反映进度,并且只使用时间估算,您可以这样做。
import concurrent.futures
import time
from tqdm import tqdm
def timed_future_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
def blocking_job():
time.sleep(2)
return 'result'
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(blocking_job)
timed_future_progress_bar(future, 5)
print(f'Work done: {future.result()}')
main()
无论作业花费的时间比预期多还是少,这都应该表现得明智。如果作业运行时间超过预期,则进度将在 90% 处等待,直到完成。