如何并行处理 python 脚本以处理 10,000 个文件?

How to parallelise python script for processing 10,000 files?

我有超过 10,000 个 C 文件,我需要将它们中的每一个传递给某个应用程序 foo.exe 以便为每个 [=12] 处理和生成反汇编文件=] 文件,即在此过程结束时,我将拥有 10,000 个 lst/output 文件!假设,这个过程不是 IO-Bound(尽管 foo.exe 为每个 c 文件将新的 lst 文件写入磁盘. 这是正确的假设吗?)。

我的任务是

实施并行 python 程序以在最短时间内完成工作!通过利用所有 cpu 个核心来完成这项任务。

我的做法

我已经实现了这个程序并且对我有用,伪代码如下:

  1. 遍历所有 c 文件并在全局 Listfiles_list.
  2. 中为每个文件推送 abs path
  3. 计算cpu逻辑核心数(使用psutilpy模块),这将是稍后要调度的最大线程数。让我们假设它是 8 个线程。
  4. 生成新列表,workers_list(它是一个列表列表),其中包含由 files_list 除法产生的区间或索引(L_index、R_index) 8。例如如果我有 800 个 c 文件,那么 workers_list 将如下所示:workers_list = [[0-99],[100,199],...,[700,799]].
  5. 分派 8 个线程,workers,每个线程将处理 workers_list 中的单个条目。每个线程将打开进程 (subprocess.call(...)) 并在当前 c 文件上调用 foo.exe

贴出相关代码如下:

相关代码

import multiprocessing
import subprocess
import psutil
import threading
import os

class LstGenerator(object):
  def __init__(self):
    self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable 
    self.output_dir = r"C:\.....\out"             #abs path to where i want the lst files to be generated
    self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
  
  def slice(self, files):
    files_len = len(files)
    j = psutil.cpu_count()
    slice_step = files_len / j
    workers_list = []
    lhs = 0
    rhs = slice_step
    while j:
      workers_list.append(files[lhs:rhs])
      lhs += slice_step
      rhs += slice_step
      j -= 1
      if j == 1:  # last iteration
        workers_list.append(files[lhs:files_len])
        break
    for each in workers_list:  #for debug only
      print len(each)
    return workers_list
  
  def disassemble(self, objectfiles):
    for each_object in objectfiles:
      cmd = "{elfdump} -T {object} -o {lst}".format(
        elfdump=self.elfdumpExePath,
        object=each_object,
        lst=os.path.join(self.outputs, os.path.basename(each_object).rstrip('o') + 'lst'))
      p = subprocess.call(cmd, shell=True)
  
  def execute(self):
    class FuncThread(threading.Thread):
      def __init__(self, target, *args):
        self._target = target
        self._args = args
        threading.Thread.__init__(self)
    
    workers = []
    for portion in self.slice(self.files):
      workers.append(FuncThread(self.disassemble, portion))

    # dispatch the workers
    for worker in workers:
      worker.start()
    
    # wait or join the previous dispatched workers
    for worker in workers:
      worker.join()
  
  

if __name__ == '__main__':
  lst_gen = LstGenerator()
  lst_gen.execute()

我的问题

  1. 我可以用更有效的方式做到这一点吗?
  2. python 是否有标准库或模块可以完成工作并降低我的 code/logic 复杂性?也许 multiprocessing.Pool?

运行 在 windows 上,python 2.7!

谢谢

是的,multiprocessing.Pool 可以提供帮助。这也完成了为每个 CPU 切分输入列表的工作。这是 python 代码(未经测试),应该可以帮助您。

import multiprocessing
import os
import subprocess

def convert(objectfile):
    elfdumpExePath = "C:\.....\elfdump.exe"
    output_dir = "C:\.....\out"

    cmd = "{elfdump} -T {obj} -o {lst}".format(
        elfdump=elfdumpExePath,
        obj=objectfile,
        lst=os.path.join(output_dir, os.path.basename(objectfile).rstrip('o') + 'lst'))
    return cmd

files = ["foo.c", "foo1.c", "foo2.c"]

p = multiprocessing.Pool()
outputs = p.map(convert, files)

请记住,您的辅助函数(上面的convert)必须接受一个参数。因此,如果您需要传入输入路径和输出路径,则必须作为单个参数来完成,并且您的文件名列表必须转换为成对列表,其中每一对都是输入和输出。

上面的答案是针对 python 2.7 的,但请记住,python2 的生命周期已经结束。在 python3 中,您可以在 with 语句中使用 multiprocessing.Pool,以便它自行清理。

在为我的问题苦苦挣扎了一段时间后发布了我的问题的答案,并注意到我可以在 python2.x 中导入 concurrent.futures!这种方法将代码复杂性降到最低,甚至可以缩短执行时间。与我最初的想法不同,这些进程比 cpu 更受 IO 限制!然而,我得到的时间效率对于运行多进程程序来说已经足够方便了。


concurrent.futures

The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

class concurrent.futures.Executor
An abstract class that provides methods to execute calls asynchronously. It should not be used directly, but through its concrete subclasses.

submit(fn, *args, **kwargs)

Schedules the callable, fn, to be executed as fn(*args **kwargs) and returns a Future object representing the execution of the callable.

如需进一步阅读,请按照以下内容进行操作: parallel tasks with concurrent.futures


import multiprocessing
import subprocess
import psutil
import threading
import os
import concurrent.futures

class LstGenerator(object):
  def __init__(self):
    self.elfdumpExePath = r"C:\.....\elfdump.exe" #abs path to the executable 
    self.output_dir = r"C:\.....\out"             #abs path to where i want the lst files to be generated
    self.files = [] # assuming that i have all the files in this list (abs path for each .C file)
  
  def disassemble(self, objectfile):
    cmd = "{elfdump} -T {object} -o {lst}".format(
      elfdump=self.elfdumpExePath,
      object=objectfile,
      lst=os.path.join(self.outputs, os.path.basename(objectfile).rstrip('o') + 'lst'))
    return subprocess.call(cmd, shell=True,stdout=subprocess.PIPE) 
  
  def execute(self):
    with concurrent.futures.ProcessPoolExecutor() as executor:
      results = [executor.submit(self.disassemble(file)) for file in self.files]
  
  

if __name__ == '__main__':
  lst_gen = LstGenerator()
  lst_gen.execute()