如何在 python 中并行化这个嵌套循环

How to parallelize this nested loop in python

我正在努力提高我的代码的性能,但不知道如何在其中实现多处理模块。

我正在使用 linux (CentOS 7.2) 和 python 2.7

我需要在并行环境中运行的代码:

def start_fetching(directory):
    with open("test.txt", "a") as myfile:
        try:
            for dirpath, dirnames, filenames in os.walk(directory):
                for current_file in filenames:
                    current_file = dirpath + "/" + current_file
                    myfile.write(current_file)
            return 0
        except:
            return sys.exc_info()[0]

if __name__ == "__main__":
    cwd = "/home/"
    final_status = start_fetching(cwd)
    exit(final_status)

我需要将所有文件的元数据(这里只显示文件名)保存在数据库中。这里我只是将文件名存储在一个文本文件中。

我猜你想要并行化大型任务。无论您提供什么,都只是文件中的文件名。 我已经为每个线程输出创建了一个单独的文件,稍后您也可以合并所有这些文件。还有其他方法可以实现这一点。

如果主要问题是并行化,下面可能是一个解决方案。

Python支持多线程和多处理。多线程不是真正的并行处理,在 IO 块的情况下我们可以并行执行。如果你想要并行代码,使用多处理[https://docs.python.org/2/library/multiprocessing.html]。您的代码可能如下所示。

from multiprocessing import Process

def task(filename):
    with open(filename+"test.txt", "a") as myfile:
         myfile.write(filename)

def start_fetching(directory):
    try:
        processes = []
        for dirpath, dirnames, filenames in os.walk(directory):
            for current_file in filenames:
                current_file = dirpath + "/" + current_file
                # Create Seperate process and do what you want, becausee Multi-threading wont help in parallezing
                p = Process(target=f, args=(current_file,))
                p.start()
                processes.append(p)

        # Let all the child processes finish and do some post processing if needed.
        for process in processes:
            process.join()

        return 0
    except:
        return sys.exc_info()[0] 

if __name__ == "__main__":
    cwd = "/home/"
    final_status = start_fetching(cwd)
    exit(final_status)

感谢大家帮助我将此脚本的处理时间减少了将近一半。 (我添加这个作为答案,因为我不能在评论中添加这么多内容)

我找到了两种方法来实现我的愿望:

  1. 使用@KeerthanaPrabhakaran 提到的 link,它与多线程有关。

    def worker(filename):
        subprocess_out = subprocess.Popen(["stat", "-c",
                                   "INSERT INTO file VALUES (NULL, \"%n\", '%F', %s, %u, %g, datetime(%X, 'unixepoch', 'localtime'), datetime(%Y, 'unixepoch', 'localtime'), datetime(%Z, 'unixepoch', 'localtime'));", filename], stdout=subprocess.PIPE)
        return subprocess_out.communicate()[0]
    
    def start_fetching(directory, threads):
        filename = fetch_filename() + ".txt"
        with contextlib.closing(multiprocessing.Pool(threads)) as pool:   # pool of threads processes
            with open(filename, "a") as myfile:
                walk = os.walk(directory)
                fn_gen = itertools.chain.from_iterable((os.path.join(root, file) for file in files) for root, dirs, files in walk)
    
                results_of_work = pool.map(worker, fn_gen)  # this does the parallel processing
                print "Concatenating the result into the text file"
                for result in results_of_work:
                    myfile.write(str(result))
        return filename
    

    这是在0m15.154s中遍历了15203个文件

  2. @ArunKumar 提到的第二个与多处理有关:

    def task(filename, process_no, return_dict):
        subprocess_out = subprocess.Popen(["stat", "-c",
                                   "INSERT INTO file VALUES (NULL, \"%n\", '%F', %s, %u, %g, datetime(%X, 'unixepoch', 'localtime'), datetime(%Y, 'unixepoch', 'localtime'), datetime(%Z, 'unixepoch', 'localtime'));",
                                   filename], stdout=subprocess.PIPE)
        return_dict[process_no] = subprocess_out.communicate()[0]
    
    
    def start_fetching_1(directory):
        try:
            processes = []
            i = 0
            manager = multiprocessing.Manager()
            return_dict = manager.dict()
    
            for dirpath, dirnames, filenames in os.walk(directory):
                for current_file in filenames:
                    current_file = dirpath + "/" + current_file
                    # Create Seperate process and do what you want, becausee Multi-threading wont help in parallezing
                    p = multiprocessing.Process(target=task, args=(current_file, i, return_dict))
                    i += 1
                    p.start()
                    processes.append(p)
    
            # Let all the child processes finish and do some post processing if needed.
            for process in processes:
                process.join()
    
            with open("test.txt", "a") as myfile:
                myfile.write(return_dict.values())
    
            return 0
        except:
            return sys.exc_info()[0]
    

    这是在1m12.197s中遍历15203个文件

我不明白为什么多处理要花那么多时间(我的初始代码只用了 0m27.884 秒),但利用率几乎是 100% CPU。

以上代码是我 运行 的确切代码,(我将这些信息存储在一个文件中,然后使用这些 test.txt 文件创建数据库条目)

我正在尝试进一步优化上面的代码,但想不出更好的方法,正如@CongMa所说,它可能终于到了I/O瓶颈。