多处理时间随着核心数的增加而线性增加
Multiprocessing time increases linearly with more cores
我有一个 arcpy
过程,需要在一堆层上进行联合,运行 一些计算,并编写 HTML 报告。考虑到我需要生成的报告数量 (~2,100),我需要这个过程尽可能快(我的目标是每份报告 2 秒)。当我 运行 遇到一个问题时,我尝试了多种方法来做到这一点,包括多处理,即 运行 多进程部分基本上花费相同的时间,无论有多少我使用的内核。
例如,对于相同数量的报告:
- 2 个核心每轮花费约 30 秒(因此 40 份报告需要 40/2 * 30 秒)
- 4 个核心耗时约 60 秒 (40/4 * 60)
- 10 个核心耗时约 160 秒 (40/10 * 160)
等等。它的总时间是相同的,因为一次搅拌两倍的时间需要两倍的时间。
这是否意味着我的问题是 I/O 受限,而不是 CPU 受限?(如果是这样 - 我该怎么办?)我本以为是后者,因为我计时的最大瓶颈是联合(它占用了大约 50% 的处理时间)。工会在 ArcGIS 中通常很昂贵,所以我假设将其分解并且 运行 2 - 10 一次会快 2 - 10 倍。或者,我可能错误地实现了多进程?
## Worker function just included to give some context
def worker(sub_code):
layer = 'in_memory/lyr_{}'.format(sub_code)
arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code))
arcpy.env.extent = layer
union_name = 'in_memory/union_' + sub_code
arcpy.Union_analysis([fields],
union_name,
"NO_FID", "1 FEET")
#.......Some calculations using cursors
# Templating using Jinjah
context = {}
context['DATE'] = now.strftime("%B %d, %Y")
context['SUB_CD'] = sub_code
context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))])
# Etc
# Then write the report out using custom function
write_html('template.html', 'output_folder', context)
if __name__ == '__main__':
subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])})
NUM_CORES = 7
chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
for chunk in chunk_list:
jobs = []
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
jobs.append(p)
p.start()
for process in jobs:
process.join()
我不确定您是否正确使用 Process
池来跟踪您的作业。这个:
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
jobs.append(p)
p.start()
for process in jobs:
process.join()
应该改为:
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
p.start()
p.join()
您反对 spec of using the multiprocessing library 有什么具体原因吗?您不会等到线程终止才开始启动另一个进程,这只会创建一大堆未被父调用进程处理的进程。
这里没什么好说的,而且我没有使用 ArcGIS 的经验。所以我只能记下两个更高层次的事情。首先,"the usual" 解决此问题的方法是将 NUM_CORES = 7
下面的所有代码替换为:
pool = multiprocessing.Pool(NUM_CORES)
pool.map(worker, subList)
pool.close()
pool.join()
map()
负责让所有工作进程尽可能忙碌。按照原样,您启动 7 个进程,然后等待 所有 个进程完成。在最慢的进程消失之前完成的所有进程,它们的核心处于空闲状态,等待下一次外循环迭代。 Pool
在作业期间保持 7 个进程处于活动状态,并在完成最后一项工作后立即为每个进程提供一项新工作。
其次,这部分以逻辑错误结束:
chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
你想要 NUM_CORES
而不是 NUM_CORES-1
。按原样,第一次提取
subList[0:7]
然后
subList[6:13]
然后
subList[12:19]
等等。 subList[6]
和 subList[12]
(等)各提取两次。子列表重叠。
你没有向我们展示足够多的东西来确定你在做什么。例如,你的 env.workspace
是多少? subbasinFC
的值是多少?似乎您在每个过程开始时都在进行分析,以将数据过滤到 layer
。但是 subbasinFC
是来自磁盘还是来自内存?如果它来自磁盘,我建议您在任何进程尝试过滤之前将所有内容读入内存。如果您有足够的内存来支持它,那应该会加快速度。否则,是的,您将 I/O 绑定到输入数据上。
请原谅我的 arcpy
无知,但为什么要在 context['SUB_ACRES']
的总和中插入一个 where 子句?你不是一开始就在 sub_code
上过滤了吗? (我们不知道 union 是什么,所以也许您正在与未过滤的东西合并...)
我有一个 arcpy
过程,需要在一堆层上进行联合,运行 一些计算,并编写 HTML 报告。考虑到我需要生成的报告数量 (~2,100),我需要这个过程尽可能快(我的目标是每份报告 2 秒)。当我 运行 遇到一个问题时,我尝试了多种方法来做到这一点,包括多处理,即 运行 多进程部分基本上花费相同的时间,无论有多少我使用的内核。
例如,对于相同数量的报告:
- 2 个核心每轮花费约 30 秒(因此 40 份报告需要 40/2 * 30 秒)
- 4 个核心耗时约 60 秒 (40/4 * 60)
- 10 个核心耗时约 160 秒 (40/10 * 160)
等等。它的总时间是相同的,因为一次搅拌两倍的时间需要两倍的时间。
这是否意味着我的问题是 I/O 受限,而不是 CPU 受限?(如果是这样 - 我该怎么办?)我本以为是后者,因为我计时的最大瓶颈是联合(它占用了大约 50% 的处理时间)。工会在 ArcGIS 中通常很昂贵,所以我假设将其分解并且 运行 2 - 10 一次会快 2 - 10 倍。或者,我可能错误地实现了多进程?
## Worker function just included to give some context
def worker(sub_code):
layer = 'in_memory/lyr_{}'.format(sub_code)
arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code))
arcpy.env.extent = layer
union_name = 'in_memory/union_' + sub_code
arcpy.Union_analysis([fields],
union_name,
"NO_FID", "1 FEET")
#.......Some calculations using cursors
# Templating using Jinjah
context = {}
context['DATE'] = now.strftime("%B %d, %Y")
context['SUB_CD'] = sub_code
context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))])
# Etc
# Then write the report out using custom function
write_html('template.html', 'output_folder', context)
if __name__ == '__main__':
subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])})
NUM_CORES = 7
chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
for chunk in chunk_list:
jobs = []
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
jobs.append(p)
p.start()
for process in jobs:
process.join()
我不确定您是否正确使用 Process
池来跟踪您的作业。这个:
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
jobs.append(p)
p.start()
for process in jobs:
process.join()
应该改为:
for subbasin in chunk:
p = multiprocessing.Process(target=worker, args=(subbasin,))
p.start()
p.join()
您反对 spec of using the multiprocessing library 有什么具体原因吗?您不会等到线程终止才开始启动另一个进程,这只会创建一大堆未被父调用进程处理的进程。
这里没什么好说的,而且我没有使用 ArcGIS 的经验。所以我只能记下两个更高层次的事情。首先,"the usual" 解决此问题的方法是将 NUM_CORES = 7
下面的所有代码替换为:
pool = multiprocessing.Pool(NUM_CORES)
pool.map(worker, subList)
pool.close()
pool.join()
map()
负责让所有工作进程尽可能忙碌。按照原样,您启动 7 个进程,然后等待 所有 个进程完成。在最慢的进程消失之前完成的所有进程,它们的核心处于空闲状态,等待下一次外循环迭代。 Pool
在作业期间保持 7 个进程处于活动状态,并在完成最后一项工作后立即为每个进程提供一项新工作。
其次,这部分以逻辑错误结束:
chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)]
你想要 NUM_CORES
而不是 NUM_CORES-1
。按原样,第一次提取
subList[0:7]
然后
subList[6:13]
然后
subList[12:19]
等等。 subList[6]
和 subList[12]
(等)各提取两次。子列表重叠。
你没有向我们展示足够多的东西来确定你在做什么。例如,你的 env.workspace
是多少? subbasinFC
的值是多少?似乎您在每个过程开始时都在进行分析,以将数据过滤到 layer
。但是 subbasinFC
是来自磁盘还是来自内存?如果它来自磁盘,我建议您在任何进程尝试过滤之前将所有内容读入内存。如果您有足够的内存来支持它,那应该会加快速度。否则,是的,您将 I/O 绑定到输入数据上。
请原谅我的 arcpy
无知,但为什么要在 context['SUB_ACRES']
的总和中插入一个 where 子句?你不是一开始就在 sub_code
上过滤了吗? (我们不知道 union 是什么,所以也许您正在与未过滤的东西合并...)