运行 在 main 之外并行的自定义函数列表

Running a list of custom funcs in parallel, outside of main

我在脚本的特定区域内调用了以下行:

data = [extract_func(domain, response) for domain, response, extract_func in responses]

基本上我在变量 responses 中使用 aiohttp 异步收集了一堆网页响应,这很好而且很快,所以我们已经得到了。问题是这些响应的解析(使用 Beautiful Soup)不是异步的,所以我必须以其他方式并行化。

(从技术上讲,每个 extract_func 都是与响应数据预先打包在一起的许多不同的提取函数之一,因此我为每个页面调用正确的 Beautiful Soup 解析代码。域也被传入用于其他包装目的。)

无论如何,我不知道如何同时 运行 所有这些提取函数,然后收集结果。我尝试研究多处理,但它似乎不适用于此处/要求您直接从 main 启动它,而我的这个收集过程是从另一个函数中进行的。

我试过这个例子(每个 extract_function 最后都会将返回的结果添加到某个全局列表中 - 然后我在这里尝试):

global extract_shared
extract_shared = []
proc = []

for domain, response, extract_func in responses:
    p = Process(target=extract_func, args=(domain, response))
    p.start()
    proc.append(p)

for p in proc:
    p.join()

data = extract_shared

然而,这似乎仍然进展得非常缓慢,而且我最终还是没有数据,所以我的代码仍然是错误的。

有没有更好的方法来解决这个问题?

这是正确的吗?

pool = multiprocessing.Pool(multiprocessing.cpu_count())
result_objects = [pool.apply_async(extract_func, args=(domain, response)) for domain, response, extract_func in responses]
data = [r.get() for r in result_objects]
pool.close()
pool.join()
return data

问题是您定义的 extract_shared 列表作为单独的实例存在于每个进程的地址 space 中。您需要有一个 extract_shared 的共享内存实现,以便每个进程都附加到同一个列表。如果我知道追加的是什么类型的数据,我可能会推荐哪种类型的 multiprocessing.Array to use. Alternatively, although it carries a bit more overhead to use, a managed list that is created by a multiprocessing.SyncManager 和函数就像一个常规列表,可能更易于使用。

使用多处理池是可行的方法。如果您的辅助函数没有 return 有意义的结果,则无需保存 ApplyAsync 调用 return 的 AsyncResult 个实例。只需调用 pool.close() 然后调用 pool.join() 就足以等待所有未完成的提交任务完成。

import multiprocessing


def init_pool(the_list):
    global extract_shared
    extract_shared = the_list


# required for Windows:
if __name__ == '__main__':
    # compute pool size required but no greater than number of CPU cores we have:
    n_processes = min(len(responses), multiprocessing.cpu_count())
    # create a managed list:
    extract_shared = multiprocessing.Manager().list()
    # Initialize each process in the pool's global variable extract_shared with our extract_shared
    # (a managed list can also be passed as another argument to the worker function instead)
    pool = multiprocessing.Pool(n_processes, initializer=init_pool, initargs=(extract_shared,))
    for domain, response, extract_func in responses:
        pool.apply_async(extract_func, args=(domain, response))
    # wait for tasks to complete
    pool.close()
    pool.join()
    # results in extract_shared
    print(extract_shared)

更新

如果让工作人员函数 return 结果和主进程进行附加操作会更容易。你基本上有正确的代码,除了我将池大小限制为小于你拥有的 CPU 核心数量,如果你提交的任务数量小于该数量。