如何使用所有 CPU 对大量文件进行子处理?
How to subprocess a big list of files using all CPUs?
我需要在命令行中使用 LaTeXML 库将 86,000 个 TEX 文件转换为 XML。我尝试编写一个 Python 脚本来使用 subprocess
模块自动执行此操作,利用所有 4 个内核。
def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()
该脚本导致 Too many open files
并降低了我的计算机速度。通过查看 Activity 监视器,该脚本似乎试图一次创建 86,000 个转换子进程,并且每个进程都试图打开一个文件。也许这是 pool.imap_unordered(convert_to_xml, preprints)
的结果——也许我不需要将 map 与 subprocess.Popen
结合使用,因为我有太多命令要调用?有什么替代方案?
我花了一整天的时间试图找出处理批量子处理的正确方法。我是 Python 这部分的新手,因此非常感谢任何有关正确方向的提示。谢谢!
在 convert_to_xml
中,process = subprocess.Popen(...)
语句生成了一个 latexml
子进程。
如果没有 process.communicate()
等阻塞调用,即使 latexml
在后台继续 运行,convert_to_xml
也会结束。
由于convert_to_xml
结束,Pool将关联的工作进程发送另一个任务给运行,因此再次调用convert_to_xml
。
另一个 latexml
进程再次在后台生成。
很快,您将在 latexml
个进程中达到目标,并且已达到打开文件数的资源限制。
修复很简单:添加 process.communicate()
告诉 convert_to_xml
等到 latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
关于if __name__ == '__main__'
:
因为, there is a warning in the multiprocessing docs那个
不应在模块的顶层调用产生新进程的代码。
相反,代码应包含在 if __name__ == '__main__'
语句中。
在 Linux 中,如果您无视此警告,则不会发生任何可怕的事情。
但是在Windows中,代码"fork-bombs"。或者更准确地说,代码
导致生成一个完整的子进程链,因为在 Windows 上 fork
是通过生成一个新的 Python 进程来模拟的,然后导入调用脚本。每次导入都会产生一个新的 Python 进程。每个 Python 进程都会尝试导入调用脚本。直到所有资源都被消耗,循环才会被打破。
所以为了善待我们的 Windows-fork-bereft 弟兄们,使用
if __name__ == '__main__:
start()
有时进程需要大量内存。 The only reliable way释放内存就是终止进程。 maxtasksperchild=1
告诉 pool
在完成 1 个任务后终止每个工作进程。然后它会生成一个新的工作进程来处理另一个任务(如果有的话)。这释放了原始工作人员可能分配的(内存)资源,否则无法释放这些资源。
在您的情况下,工作进程似乎不需要太多内存,因此您可能不需要 maxtasksperchild=1
。
在 convert_to_xml
中,process = subprocess.Popen(...)
语句生成了一个 latexml
子进程。
如果没有 process.communicate()
等阻塞调用,即使 latexml
在后台继续 运行,convert_to_xml
也会结束。
由于convert_to_xml
结束,Pool将关联的工作进程发送另一个任务给运行,因此再次调用convert_to_xml
。
另一个 latexml
进程再次在后台生成。
很快,您将在 latexml
个进程中达到目标,并且已达到打开文件数的资源限制。
修复很简单:添加 process.communicate()
告诉 convert_to_xml
等到 latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
chunksize
影响工作人员在将结果发送回主进程之前执行的任务数。
Sometimes 这会影响性能,尤其是当进程间通信占总体 运行 时间的重要部分时。
在你的情况下,convert_to_xml
需要相对较长的时间(假设我们等到 latexml
完成)并且它只是 returns None
。因此,进程间通信可能不是整个 运行 时间的重要部分。因此,我不希望您在这种情况下发现性能有显着变化(尽管试验永远不会有坏处!)。
在普通的 Python 中,map
不应仅用于多次调用一个函数。
出于类似的风格原因,我会在我关心 return 值的情况下保留使用 pool.*map*
方法。
所以不用
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
您可以考虑使用
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
相反。
传递给任何 pool.*map*
函数的迭代被 消耗掉
立即。可迭代对象是否是迭代器并不重要。没有
在这里使用迭代器有特殊的内存好处。 imap_unordered
returns一个
迭代器,但它不以任何特别对迭代器友好的方式处理它的输入
方法。
无论传递什么类型的可迭代对象,在调用 pool.*map*
函数时,可迭代对象都是
消费并转化为放入任务队列的任务。
这是证实这一说法的代码:
version1.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
version2.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
运行 version1.py
和 version2.py
都产生相同的结果。
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
至关重要的是,您会注意到 Got here
很快打印了 10 次
运行 的开头,然后是长时间的停顿(计算时
完成)在程序结束之前。
如果发电机 gen()
以某种方式被 pool.imap_unordered
缓慢消耗,
我们应该期望 Got here
的打印速度也很慢。因为 Got here
是
快速打印 10 次,我们可以看到可迭代对象 gen()
正在
在任务完成之前就完全消耗完了。
运行 这些程序有望让您相信
pool.imap_unordered
和 pool.apply_async
正在将任务放入队列
基本上以相同的方式:调用后立即。
我需要在命令行中使用 LaTeXML 库将 86,000 个 TEX 文件转换为 XML。我尝试编写一个 Python 脚本来使用 subprocess
模块自动执行此操作,利用所有 4 个内核。
def get_outpath(tex_path):
path_parts = pathlib.Path(tex_path).parts
arxiv_id = path_parts[2]
outpath = 'xml/' + arxiv_id + '.xml'
return outpath
def convert_to_xml(inpath):
outpath = get_outpath(inpath)
if os.path.isfile(outpath):
message = '{}: Already converted.'.format(inpath)
print(message)
return
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
message = '{}: Converted!'.format(inpath)
print(message)
def start():
start_time = time.time()
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
maxtasksperchild=1)
print('Initialized {} threads'.format(multiprocessing.cpu_count()))
print('Beginning conversion...')
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
pool.close()
pool.join()
print("TIME: {}".format(total_time))
start()
该脚本导致 Too many open files
并降低了我的计算机速度。通过查看 Activity 监视器,该脚本似乎试图一次创建 86,000 个转换子进程,并且每个进程都试图打开一个文件。也许这是 pool.imap_unordered(convert_to_xml, preprints)
的结果——也许我不需要将 map 与 subprocess.Popen
结合使用,因为我有太多命令要调用?有什么替代方案?
我花了一整天的时间试图找出处理批量子处理的正确方法。我是 Python 这部分的新手,因此非常感谢任何有关正确方向的提示。谢谢!
在 convert_to_xml
中,process = subprocess.Popen(...)
语句生成了一个 latexml
子进程。
如果没有 process.communicate()
等阻塞调用,即使 latexml
在后台继续 运行,convert_to_xml
也会结束。
由于convert_to_xml
结束,Pool将关联的工作进程发送另一个任务给运行,因此再次调用convert_to_xml
。
另一个 latexml
进程再次在后台生成。
很快,您将在 latexml
个进程中达到目标,并且已达到打开文件数的资源限制。
修复很简单:添加 process.communicate()
告诉 convert_to_xml
等到 latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
关于if __name__ == '__main__'
:
因为if __name__ == '__main__'
语句中。
在 Linux 中,如果您无视此警告,则不会发生任何可怕的事情。
但是在Windows中,代码"fork-bombs"。或者更准确地说,代码
导致生成一个完整的子进程链,因为在 Windows 上 fork
是通过生成一个新的 Python 进程来模拟的,然后导入调用脚本。每次导入都会产生一个新的 Python 进程。每个 Python 进程都会尝试导入调用脚本。直到所有资源都被消耗,循环才会被打破。
所以为了善待我们的 Windows-fork-bereft 弟兄们,使用
if __name__ == '__main__:
start()
有时进程需要大量内存。 The only reliable way释放内存就是终止进程。 maxtasksperchild=1
告诉 pool
在完成 1 个任务后终止每个工作进程。然后它会生成一个新的工作进程来处理另一个任务(如果有的话)。这释放了原始工作人员可能分配的(内存)资源,否则无法释放这些资源。
在您的情况下,工作进程似乎不需要太多内存,因此您可能不需要 maxtasksperchild=1
。
在 convert_to_xml
中,process = subprocess.Popen(...)
语句生成了一个 latexml
子进程。
如果没有 process.communicate()
等阻塞调用,即使 latexml
在后台继续 运行,convert_to_xml
也会结束。
由于convert_to_xml
结束,Pool将关联的工作进程发送另一个任务给运行,因此再次调用convert_to_xml
。
另一个 latexml
进程再次在后台生成。
很快,您将在 latexml
个进程中达到目标,并且已达到打开文件数的资源限制。
修复很简单:添加 process.communicate()
告诉 convert_to_xml
等到 latexml
进程完成。
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
chunksize
影响工作人员在将结果发送回主进程之前执行的任务数。
Sometimes 这会影响性能,尤其是当进程间通信占总体 运行 时间的重要部分时。
在你的情况下,convert_to_xml
需要相对较长的时间(假设我们等到 latexml
完成)并且它只是 returns None
。因此,进程间通信可能不是整个 运行 时间的重要部分。因此,我不希望您在这种情况下发现性能有显着变化(尽管试验永远不会有坏处!)。
在普通的 Python 中,map
不应仅用于多次调用一个函数。
出于类似的风格原因,我会在我关心 return 值的情况下保留使用 pool.*map*
方法。
所以不用
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
您可以考虑使用
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
相反。
传递给任何 pool.*map*
函数的迭代被 消耗掉
立即。可迭代对象是否是迭代器并不重要。没有
在这里使用迭代器有特殊的内存好处。 imap_unordered
returns一个
迭代器,但它不以任何特别对迭代器友好的方式处理它的输入
方法。
无论传递什么类型的可迭代对象,在调用 pool.*map*
函数时,可迭代对象都是
消费并转化为放入任务队列的任务。
这是证实这一说法的代码:
version1.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
version2.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
运行 version1.py
和 version2.py
都产生相同的结果。
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
至关重要的是,您会注意到 Got here
很快打印了 10 次
运行 的开头,然后是长时间的停顿(计算时
完成)在程序结束之前。
如果发电机 gen()
以某种方式被 pool.imap_unordered
缓慢消耗,
我们应该期望 Got here
的打印速度也很慢。因为 Got here
是
快速打印 10 次,我们可以看到可迭代对象 gen()
正在
在任务完成之前就完全消耗完了。
运行 这些程序有望让您相信
pool.imap_unordered
和 pool.apply_async
正在将任务放入队列
基本上以相同的方式:调用后立即。