使用外部命令多重处理数千个文件
Multiprocessing thousands of files with external command
我想从 Python 为大约 8000 个文件启动外部命令。每个文件都独立于其他文件进行处理。唯一的限制是在处理完所有文件后继续执行。我有 4 个物理内核,每个物理内核有 2 个逻辑内核 (multiprocessing.cpu_count()
returns 8)。我的想法是使用一个由四个并行独立进程组成的池,这些进程将 运行 在 8 个核心中的 4 个上。这样我的机器就可以同时使用了。
这是我一直在做的事情:
import multiprocessing
import subprocess
import os
from multiprocessing.pool import ThreadPool
def process_files(input_dir, output_dir, option):
pool = ThreadPool(multiprocessing.cpu_count()/2)
for filename in os.listdir(input_dir): # about 8000 files
f_in = os.path.join(input_dir, filename)
f_out = os.path.join(output_dir, filename)
cmd = ['molconvert', option, f_in, '-o', f_out]
pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()
def main():
process_files('dir1', 'dir2', 'mol:H')
do_some_stuff('dir2')
process_files('dir2', 'dir3', 'mol:a')
do_more_stuff('dir3')
一批 100 个文件的顺序处理需要 120 秒。上面概述的多处理版本(函数 process_files
)只需要 20 秒的批处理时间。但是,当我 运行 process_files
处理整套 8000 个文件时,我的 PC 挂起并且在一小时后没有解冻。
我的问题是:
1) 我认为 ThreadPool
应该初始化一个进程池(确切地说,这里是 multiprocessing.cpu_count()/2
个进程)。但是,我的计算机挂在 8000 个文件上而不是 100 个文件上,这表明可能没有考虑池的大小。要么,要么我做错了什么。你能解释一下吗?
2) 这是在 Python 下启动独立进程的正确方法吗?当每个进程都必须启动外部命令时,并且所有资源都不会被处理占用?
如果你用的是Python3,我会考虑用concurrent.futures.ThreadPoolExecutor
的map
方法。
或者,您可以自己管理子流程列表。
下面的例子定义了一个函数来启动ffmpeg
将一个视频文件转换成Theora/Vorbis格式。它 returns 每个启动子进程的 Popen 对象。
def startencoder(iname, oname, offs=None):
args = ['ffmpeg']
if offs is not None and offs > 0:
args += ['-ss', str(offs)]
args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
'libvorbis', '-q:a', '3', '-sn', oname]
with open(os.devnull, 'w') as bb:
p = subprocess.Popen(args, stdout=bb, stderr=bb)
return p
在主程序中,代表运行个子进程的Popen
个对象的列表是这样维护的。
outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
# Wait while the list of processes is full.
while len(procs) == maxprocs:
manageprocs(procs)
# Add a new process
ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
procs.append(startencoder(ifile, ogvname, offset))
ogvlist.append(ogvname)
# All jobs have been submitted, wail for them to finish.
while len(procs) > 0:
manageprocs(procs)
因此,只有当 运行 个子进程少于核心时,才会启动一个新进程。多次使用的代码被分离到 manageprocs
函数中。
def manageprocs(proclist):
for pr in proclist:
if pr.poll() is not None:
proclist.remove(pr)
sleep(0.5)
对sleep
的调用是为了防止程序在循环中自旋。
我认为你的基本问题是subprocess.Popen
的使用。该方法 而不是 在返回之前等待命令完成。由于函数 returns 立即(即使命令仍然是 运行),就您的线程池而言,该函数已完成并且它可以生成另一个...这意味着您最终生成 8000左右进程。
如果使用 subprocess.check_call
:
,你的运气可能会更好
Run command with arguments. Wait for command to complete. If
the exit code was zero then return, otherwise raise
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
所以:
def process_files(input_dir, output_dir, option):
pool = ThreadPool(multiprocessing.cpu_count()/2)
for filename in os.listdir(input_dir): # about 8000 files
f_in = os.path.join(input_dir, filename)
f_out = os.path.join(output_dir, filename)
cmd = ['molconvert', option, f_in, '-o', f_out]
pool.apply_async(subprocess.check_call, (cmd,))
pool.close()
pool.join()
如果您真的不关心退出代码,那么您可能需要 subprocess.call
,它不会在进程的退出代码非零的情况下引发异常。
我想从 Python 为大约 8000 个文件启动外部命令。每个文件都独立于其他文件进行处理。唯一的限制是在处理完所有文件后继续执行。我有 4 个物理内核,每个物理内核有 2 个逻辑内核 (multiprocessing.cpu_count()
returns 8)。我的想法是使用一个由四个并行独立进程组成的池,这些进程将 运行 在 8 个核心中的 4 个上。这样我的机器就可以同时使用了。
这是我一直在做的事情:
import multiprocessing
import subprocess
import os
from multiprocessing.pool import ThreadPool
def process_files(input_dir, output_dir, option):
pool = ThreadPool(multiprocessing.cpu_count()/2)
for filename in os.listdir(input_dir): # about 8000 files
f_in = os.path.join(input_dir, filename)
f_out = os.path.join(output_dir, filename)
cmd = ['molconvert', option, f_in, '-o', f_out]
pool.apply_async(subprocess.Popen, (cmd,))
pool.close()
pool.join()
def main():
process_files('dir1', 'dir2', 'mol:H')
do_some_stuff('dir2')
process_files('dir2', 'dir3', 'mol:a')
do_more_stuff('dir3')
一批 100 个文件的顺序处理需要 120 秒。上面概述的多处理版本(函数 process_files
)只需要 20 秒的批处理时间。但是,当我 运行 process_files
处理整套 8000 个文件时,我的 PC 挂起并且在一小时后没有解冻。
我的问题是:
1) 我认为 ThreadPool
应该初始化一个进程池(确切地说,这里是 multiprocessing.cpu_count()/2
个进程)。但是,我的计算机挂在 8000 个文件上而不是 100 个文件上,这表明可能没有考虑池的大小。要么,要么我做错了什么。你能解释一下吗?
2) 这是在 Python 下启动独立进程的正确方法吗?当每个进程都必须启动外部命令时,并且所有资源都不会被处理占用?
如果你用的是Python3,我会考虑用concurrent.futures.ThreadPoolExecutor
的map
方法。
或者,您可以自己管理子流程列表。
下面的例子定义了一个函数来启动ffmpeg
将一个视频文件转换成Theora/Vorbis格式。它 returns 每个启动子进程的 Popen 对象。
def startencoder(iname, oname, offs=None):
args = ['ffmpeg']
if offs is not None and offs > 0:
args += ['-ss', str(offs)]
args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a',
'libvorbis', '-q:a', '3', '-sn', oname]
with open(os.devnull, 'w') as bb:
p = subprocess.Popen(args, stdout=bb, stderr=bb)
return p
在主程序中,代表运行个子进程的Popen
个对象的列表是这样维护的。
outbase = tempname()
ogvlist = []
procs = []
maxprocs = cpu_count()
for n, ifile in enumerate(argv):
# Wait while the list of processes is full.
while len(procs) == maxprocs:
manageprocs(procs)
# Add a new process
ogvname = outbase + '-{:03d}.ogv'.format(n + 1)
procs.append(startencoder(ifile, ogvname, offset))
ogvlist.append(ogvname)
# All jobs have been submitted, wail for them to finish.
while len(procs) > 0:
manageprocs(procs)
因此,只有当 运行 个子进程少于核心时,才会启动一个新进程。多次使用的代码被分离到 manageprocs
函数中。
def manageprocs(proclist):
for pr in proclist:
if pr.poll() is not None:
proclist.remove(pr)
sleep(0.5)
对sleep
的调用是为了防止程序在循环中自旋。
我认为你的基本问题是subprocess.Popen
的使用。该方法 而不是 在返回之前等待命令完成。由于函数 returns 立即(即使命令仍然是 运行),就您的线程池而言,该函数已完成并且它可以生成另一个...这意味着您最终生成 8000左右进程。
如果使用 subprocess.check_call
:
Run command with arguments. Wait for command to complete. If
the exit code was zero then return, otherwise raise
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
所以:
def process_files(input_dir, output_dir, option):
pool = ThreadPool(multiprocessing.cpu_count()/2)
for filename in os.listdir(input_dir): # about 8000 files
f_in = os.path.join(input_dir, filename)
f_out = os.path.join(output_dir, filename)
cmd = ['molconvert', option, f_in, '-o', f_out]
pool.apply_async(subprocess.check_call, (cmd,))
pool.close()
pool.join()
如果您真的不关心退出代码,那么您可能需要 subprocess.call
,它不会在进程的退出代码非零的情况下引发异常。