如何并行处理 python 脚本以处理 10,000 个文件?
How to parallelise python script for processing 10,000 files?
我有超过 10,000 个 C
文件,我需要将它们中的每一个传递给某个应用程序 foo.exe
以便为每个 [=12] 处理和生成反汇编文件=] 文件,即在此过程结束时,我将拥有 10,000 个 lst
/output
文件!假设,这个过程不是 IO-Bound
(尽管 foo.exe
为每个 c
文件将新的 lst
文件写入磁盘. 这是正确的假设吗?)。
我的任务是
实施并行 python 程序以在最短时间内完成工作!通过利用所有 cpu 个核心来完成这项任务。
我的做法
我已经实现了这个程序并且对我有用,伪代码如下:
- 遍历所有
c
文件并在全局 List
、files_list
. 中为每个文件推送 abs path
- 计算
cpu
逻辑核心数(使用psutil
py模块),这将是稍后要调度的最大线程数。让我们假设它是 8 个线程。
- 生成新列表,
workers_list
(它是一个列表列表),其中包含由 files_list
除法产生的区间或索引(L_index、R_index) 8。例如如果我有 800 个 c 文件,那么 workers_list
将如下所示:workers_list = [[0-99],[100,199],...,[700,799]]
.
- 分派 8 个线程,
workers
,每个线程将处理 workers_list
中的单个条目。每个线程将打开进程 (subprocess.call(...)
) 并在当前 c
文件上调用 foo.exe
。
贴出相关代码如下:
相关代码
import multiprocessing
import subprocess
import psutil
import threading
import os
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable
self.output_dir = r"C:\.....\out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def slice(self, files):
files_len = len(files)
j = psutil.cpu_count()
slice_step = files_len / j
workers_list = []
lhs = 0
rhs = slice_step
while j:
workers_list.append(files[lhs:rhs])
lhs += slice_step
rhs += slice_step
j -= 1
if j == 1: # last iteration
workers_list.append(files[lhs:files_len])
break
for each in workers_list: #for debug only
print len(each)
return workers_list
def disassemble(self, objectfiles):
for each_object in objectfiles:
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=each_object,
lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
p = subprocess.call(cmd, shell=True)
def execute(self):
class FuncThread(threading.Thread):
def __init__(self, target, *args):
self._target = target
self._args = args
threading.Thread.__init__(self)
workers = []
for portion in self.slice(self.files):
workers.append(FuncThread(self.disassemble, portion))
# dispatch the workers
for worker in workers:
worker.start()
# wait or join the previous dispatched workers
for worker in workers:
worker.join()
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()
我的问题
- 我可以用更有效的方式做到这一点吗?
- python 是否有标准库或模块可以完成工作并降低我的 code/logic 复杂性?也许
multiprocessing.Pool
?
运行 在 windows 上,python 2.7!
谢谢
是的,multiprocessing.Pool
可以提供帮助。这也完成了为每个 CPU 切分输入列表的工作。这是 python 代码(未经测试),应该可以帮助您。
import multiprocessing
import os
import subprocess
def convert(objectfile):
elfdumpExePath = "C:\.....\elfdump.exe"
output_dir = "C:\.....\out"
cmd = "{elfdump} -T {obj} -o {lst}".format(
elfdump=elfdumpExePath,
obj=objectfile,
lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
return cmd
files = ["foo.c", "foo1.c", "foo2.c"]
p = multiprocessing.Pool()
outputs = p.map(convert, files)
请记住,您的辅助函数(上面的convert
)必须接受一个参数。因此,如果您需要传入输入路径和输出路径,则必须作为单个参数来完成,并且您的文件名列表必须转换为成对列表,其中每一对都是输入和输出。
上面的答案是针对 python 2.7 的,但请记住,python2 的生命周期已经结束。在 python3 中,您可以在 with
语句中使用 multiprocessing.Pool
,以便它自行清理。
在为我的问题苦苦挣扎了一段时间后发布了我的问题的答案,并注意到我可以在 python2.x 中导入 concurrent.futures
!这种方法将代码复杂性降到最低,甚至可以缩短执行时间。与我最初的想法不同,这些进程比 cpu 更受 IO 限制!然而,我得到的时间效率对于运行多进程程序来说已经足够方便了。
concurrent.futures
The concurrent.futures module provides a high-level interface for asynchronously executing callables.
The asynchronous execution can be performed with threads, using
ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor.
Both implement the same interface, which is defined by the abstract
Executor class.
class concurrent.futures.Executor
An abstract class that provides
methods to execute calls asynchronously. It should not be used
directly, but through its concrete subclasses.
submit(fn, *args, **kwargs)
Schedules the callable, fn, to be executed as fn(*args **kwargs) and
returns a Future object representing the execution of the callable.
如需进一步阅读,请按照以下内容进行操作:
parallel tasks with concurrent.futures
import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable
self.output_dir = r"C:\.....\out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def disassemble(self, objectfile):
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=objectfile,
lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE)
def execute(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(self.disassemble(file)) for file in self.files]
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()
我有超过 10,000 个 C
文件,我需要将它们中的每一个传递给某个应用程序 foo.exe
以便为每个 [=12] 处理和生成反汇编文件=] 文件,即在此过程结束时,我将拥有 10,000 个 lst
/output
文件!假设,这个过程不是 IO-Bound
(尽管 foo.exe
为每个 c
文件将新的 lst
文件写入磁盘. 这是正确的假设吗?)。
我的任务是
实施并行 python 程序以在最短时间内完成工作!通过利用所有 cpu 个核心来完成这项任务。
我的做法
我已经实现了这个程序并且对我有用,伪代码如下:
- 遍历所有
c
文件并在全局List
、files_list
. 中为每个文件推送 - 计算
cpu
逻辑核心数(使用psutil
py模块),这将是稍后要调度的最大线程数。让我们假设它是 8 个线程。 - 生成新列表,
workers_list
(它是一个列表列表),其中包含由files_list
除法产生的区间或索引(L_index、R_index) 8。例如如果我有 800 个 c 文件,那么workers_list
将如下所示:workers_list = [[0-99],[100,199],...,[700,799]]
. - 分派 8 个线程,
workers
,每个线程将处理workers_list
中的单个条目。每个线程将打开进程 (subprocess.call(...)
) 并在当前c
文件上调用foo.exe
。
abs path
贴出相关代码如下:
相关代码
import multiprocessing
import subprocess
import psutil
import threading
import os
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable
self.output_dir = r"C:\.....\out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def slice(self, files):
files_len = len(files)
j = psutil.cpu_count()
slice_step = files_len / j
workers_list = []
lhs = 0
rhs = slice_step
while j:
workers_list.append(files[lhs:rhs])
lhs += slice_step
rhs += slice_step
j -= 1
if j == 1: # last iteration
workers_list.append(files[lhs:files_len])
break
for each in workers_list: #for debug only
print len(each)
return workers_list
def disassemble(self, objectfiles):
for each_object in objectfiles:
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=each_object,
lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
p = subprocess.call(cmd, shell=True)
def execute(self):
class FuncThread(threading.Thread):
def __init__(self, target, *args):
self._target = target
self._args = args
threading.Thread.__init__(self)
workers = []
for portion in self.slice(self.files):
workers.append(FuncThread(self.disassemble, portion))
# dispatch the workers
for worker in workers:
worker.start()
# wait or join the previous dispatched workers
for worker in workers:
worker.join()
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()
我的问题
- 我可以用更有效的方式做到这一点吗?
- python 是否有标准库或模块可以完成工作并降低我的 code/logic 复杂性?也许
multiprocessing.Pool
?
运行 在 windows 上,python 2.7!
谢谢
是的,multiprocessing.Pool
可以提供帮助。这也完成了为每个 CPU 切分输入列表的工作。这是 python 代码(未经测试),应该可以帮助您。
import multiprocessing
import os
import subprocess
def convert(objectfile):
elfdumpExePath = "C:\.....\elfdump.exe"
output_dir = "C:\.....\out"
cmd = "{elfdump} -T {obj} -o {lst}".format(
elfdump=elfdumpExePath,
obj=objectfile,
lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
return cmd
files = ["foo.c", "foo1.c", "foo2.c"]
p = multiprocessing.Pool()
outputs = p.map(convert, files)
请记住,您的辅助函数(上面的convert
)必须接受一个参数。因此,如果您需要传入输入路径和输出路径,则必须作为单个参数来完成,并且您的文件名列表必须转换为成对列表,其中每一对都是输入和输出。
上面的答案是针对 python 2.7 的,但请记住,python2 的生命周期已经结束。在 python3 中,您可以在 with
语句中使用 multiprocessing.Pool
,以便它自行清理。
在为我的问题苦苦挣扎了一段时间后发布了我的问题的答案,并注意到我可以在 python2.x 中导入 concurrent.futures
!这种方法将代码复杂性降到最低,甚至可以缩短执行时间。与我最初的想法不同,这些进程比 cpu 更受 IO 限制!然而,我得到的时间效率对于运行多进程程序来说已经足够方便了。
concurrent.futures
The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.
class concurrent.futures.Executor
An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.
submit(fn, *args, **kwargs)
Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.
如需进一步阅读,请按照以下内容进行操作: parallel tasks with concurrent.futures
import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures
class LstGenerator(object):
def __init__(self):
self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable
self.output_dir = r"C:\.....\out" #abs path to where i want the lst files to be generated
self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
def disassemble(self, objectfile):
cmd = "{elfdump} -T {object} -o {lst}".format(
elfdump=self.elfdumpExePath,
object=objectfile,
lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE)
def execute(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(self.disassemble(file)) for file in self.files]
if __name__ == '__main__':
lst_gen = LstGenerator()
lst_gen.execute()