如何跟踪多处理和 pool.map 的状态?
How to keep track of status with multiprocessing and pool.map?
我是第一次设置多处理模块,基本上,我打算按照
的方式做一些事情
from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1
据我了解,1
将在 所有 过程返回且结果完成后立即打印。我想对这些进行一些状态更新。实现它的最佳方法是什么?
我有点犹豫要不要打印 whateverFunction()
。特别是如果有大约 200 个值,我将打印 200 次类似 'process done' 的内容,这不是很有用。
我希望输出像
10% of myIterable done
20% of myIterable done
pool.map
阻塞,直到所有并发函数调用完成。
pool.apply_async
不阻塞。此外,您可以使用其 callback
参数
报告进展情况。回调函数 log_result
在每次 foo
完成时调用一次。它由 foo
传递值 return。
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def log_result(retval):
results.append(retval)
if len(results) % (len(data)//10) == 0:
print('{:.0%} done'.format(len(results)/len(data)))
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=log_result)
pool.close()
pool.join()
print(results)
产量
10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]
上面的log_result
函数修改了全局变量results
和
访问全局变量 data
。您不能将这些变量传递给
log_result
因为pool.apply_async
中指定的回调函数是
总是只用一个参数调用,即 foo
.
的 return 值
不过,你可以做一个闭包,这样至少可以明确哪些变量
log_result
取决于:
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def make_log_result(results, len_data):
def log_result(retval):
results.append(retval)
if len(results) % (len_data//10) == 0:
print('{:.0%} done'.format(len(results)/len_data))
return log_result
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
pool.close()
pool.join()
print(results)
我是第一次设置多处理模块,基本上,我打算按照
的方式做一些事情from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1
据我了解,1
将在 所有 过程返回且结果完成后立即打印。我想对这些进行一些状态更新。实现它的最佳方法是什么?
我有点犹豫要不要打印 whateverFunction()
。特别是如果有大约 200 个值,我将打印 200 次类似 'process done' 的内容,这不是很有用。
我希望输出像
10% of myIterable done
20% of myIterable done
pool.map
阻塞,直到所有并发函数调用完成。
pool.apply_async
不阻塞。此外,您可以使用其 callback
参数
报告进展情况。回调函数 log_result
在每次 foo
完成时调用一次。它由 foo
传递值 return。
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def log_result(retval):
results.append(retval)
if len(results) % (len(data)//10) == 0:
print('{:.0%} done'.format(len(results)/len(data)))
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=log_result)
pool.close()
pool.join()
print(results)
产量
10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]
上面的log_result
函数修改了全局变量results
和
访问全局变量 data
。您不能将这些变量传递给
log_result
因为pool.apply_async
中指定的回调函数是
总是只用一个参数调用,即 foo
.
不过,你可以做一个闭包,这样至少可以明确哪些变量
log_result
取决于:
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def make_log_result(results, len_data):
def log_result(retval):
results.append(retval)
if len(results) % (len_data//10) == 0:
print('{:.0%} done'.format(len(results)/len_data))
return log_result
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
pool.close()
pool.join()
print(results)