运行 在 main 之外并行的自定义函数列表
Running a list of custom funcs in parallel, outside of main
我在脚本的特定区域内调用了以下行:
data = [extract_func(domain, response) for domain, response, extract_func in responses]
基本上我在变量 responses
中使用 aiohttp 异步收集了一堆网页响应,这很好而且很快,所以我们已经得到了。问题是这些响应的解析(使用 Beautiful Soup)不是异步的,所以我必须以其他方式并行化。
(从技术上讲,每个 extract_func 都是与响应数据预先打包在一起的许多不同的提取函数之一,因此我为每个页面调用正确的 Beautiful Soup 解析代码。域也被传入用于其他包装目的。)
无论如何,我不知道如何同时 运行 所有这些提取函数,然后收集结果。我尝试研究多处理,但它似乎不适用于此处/要求您直接从 main 启动它,而我的这个收集过程是从另一个函数中进行的。
我试过这个例子(每个 extract_function 最后都会将返回的结果添加到某个全局列表中 - 然后我在这里尝试):
global extract_shared
extract_shared = []
proc = []
for domain, response, extract_func in responses:
p = Process(target=extract_func, args=(domain, response))
p.start()
proc.append(p)
for p in proc:
p.join()
data = extract_shared
然而,这似乎仍然进展得非常缓慢,而且我最终还是没有数据,所以我的代码仍然是错误的。
有没有更好的方法来解决这个问题?
这是正确的吗?
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result_objects = [pool.apply_async(extract_func, args=(domain, response)) for domain, response, extract_func in responses]
data = [r.get() for r in result_objects]
pool.close()
pool.join()
return data
问题是您定义的 extract_shared
列表作为单独的实例存在于每个进程的地址 space 中。您需要有一个 extract_shared
的共享内存实现,以便每个进程都附加到同一个列表。如果我知道追加的是什么类型的数据,我可能会推荐哪种类型的 multiprocessing.Array
to use. Alternatively, although it carries a bit more overhead to use, a managed list
that is created by a multiprocessing.SyncManager
和函数就像一个常规列表,可能更易于使用。
使用多处理池是可行的方法。如果您的辅助函数没有 return 有意义的结果,则无需保存 ApplyAsync
调用 return 的 AsyncResult
个实例。只需调用 pool.close()
然后调用 pool.join()
就足以等待所有未完成的提交任务完成。
import multiprocessing
def init_pool(the_list):
global extract_shared
extract_shared = the_list
# required for Windows:
if __name__ == '__main__':
# compute pool size required but no greater than number of CPU cores we have:
n_processes = min(len(responses), multiprocessing.cpu_count())
# create a managed list:
extract_shared = multiprocessing.Manager().list()
# Initialize each process in the pool's global variable extract_shared with our extract_shared
# (a managed list can also be passed as another argument to the worker function instead)
pool = multiprocessing.Pool(n_processes, initializer=init_pool, initargs=(extract_shared,))
for domain, response, extract_func in responses:
pool.apply_async(extract_func, args=(domain, response))
# wait for tasks to complete
pool.close()
pool.join()
# results in extract_shared
print(extract_shared)
更新
如果让工作人员函数 return 结果和主进程进行附加操作会更容易。你基本上有正确的代码,除了我将池大小限制为小于你拥有的 CPU 核心数量,如果你提交的任务数量小于该数量。
我在脚本的特定区域内调用了以下行:
data = [extract_func(domain, response) for domain, response, extract_func in responses]
基本上我在变量 responses
中使用 aiohttp 异步收集了一堆网页响应,这很好而且很快,所以我们已经得到了。问题是这些响应的解析(使用 Beautiful Soup)不是异步的,所以我必须以其他方式并行化。
(从技术上讲,每个 extract_func 都是与响应数据预先打包在一起的许多不同的提取函数之一,因此我为每个页面调用正确的 Beautiful Soup 解析代码。域也被传入用于其他包装目的。)
无论如何,我不知道如何同时 运行 所有这些提取函数,然后收集结果。我尝试研究多处理,但它似乎不适用于此处/要求您直接从 main 启动它,而我的这个收集过程是从另一个函数中进行的。
我试过这个例子(每个 extract_function 最后都会将返回的结果添加到某个全局列表中 - 然后我在这里尝试):
global extract_shared
extract_shared = []
proc = []
for domain, response, extract_func in responses:
p = Process(target=extract_func, args=(domain, response))
p.start()
proc.append(p)
for p in proc:
p.join()
data = extract_shared
然而,这似乎仍然进展得非常缓慢,而且我最终还是没有数据,所以我的代码仍然是错误的。
有没有更好的方法来解决这个问题?
这是正确的吗?
pool = multiprocessing.Pool(multiprocessing.cpu_count())
result_objects = [pool.apply_async(extract_func, args=(domain, response)) for domain, response, extract_func in responses]
data = [r.get() for r in result_objects]
pool.close()
pool.join()
return data
问题是您定义的 extract_shared
列表作为单独的实例存在于每个进程的地址 space 中。您需要有一个 extract_shared
的共享内存实现,以便每个进程都附加到同一个列表。如果我知道追加的是什么类型的数据,我可能会推荐哪种类型的 multiprocessing.Array
to use. Alternatively, although it carries a bit more overhead to use, a managed list
that is created by a multiprocessing.SyncManager
和函数就像一个常规列表,可能更易于使用。
使用多处理池是可行的方法。如果您的辅助函数没有 return 有意义的结果,则无需保存 ApplyAsync
调用 return 的 AsyncResult
个实例。只需调用 pool.close()
然后调用 pool.join()
就足以等待所有未完成的提交任务完成。
import multiprocessing
def init_pool(the_list):
global extract_shared
extract_shared = the_list
# required for Windows:
if __name__ == '__main__':
# compute pool size required but no greater than number of CPU cores we have:
n_processes = min(len(responses), multiprocessing.cpu_count())
# create a managed list:
extract_shared = multiprocessing.Manager().list()
# Initialize each process in the pool's global variable extract_shared with our extract_shared
# (a managed list can also be passed as another argument to the worker function instead)
pool = multiprocessing.Pool(n_processes, initializer=init_pool, initargs=(extract_shared,))
for domain, response, extract_func in responses:
pool.apply_async(extract_func, args=(domain, response))
# wait for tasks to complete
pool.close()
pool.join()
# results in extract_shared
print(extract_shared)
更新
如果让工作人员函数 return 结果和主进程进行附加操作会更容易。你基本上有正确的代码,除了我将池大小限制为小于你拥有的 CPU 核心数量,如果你提交的任务数量小于该数量。