多处理:了解 `chunksize` 背后的逻辑
multiprocessing: Understanding logic behind `chunksize`
哪些因素决定了 chunksize
方法的最佳参数,例如 multiprocessing.Pool.map()
? .map()
方法似乎对其默认块大小使用了任意启发式方法(如下所述);是什么激发了这种选择,是否有基于某些特定 situation/setup 的更周到的方法?
示例 - 假设我是:
- 将一个
iterable
传递给具有约 1500 万个元素的 .map()
;
- 在具有 24 个内核的机器上工作并在
multiprocessing.Pool()
内使用默认值 processes = os.cpu_count()
。
我天真的想法是给 24 名工人中的每人一个同样大小的块,即 15_000_000 / 24
或 625,000。大块应该减少 turnover/overhead,同时充分利用所有工人。但这似乎忽略了向每个工人提供大批量的一些潜在缺点。这是一张不完整的图片吗,我错过了什么?
我的部分问题源于 if chunksize=None
的默认逻辑:.map()
和 .starmap()
都调用 .map_async()
,如下所示:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
divmod(len(iterable), len(self._pool) * 4)
背后的逻辑是什么?这意味着块大小将更接近 15_000_000 / (24 * 4) == 156_250
。将 len(self._pool)
乘以 4 的意图是什么?
这使得生成的块大小比我上面的 "naive logic" 小 4 倍,这包括将可迭代的长度除以工人的数量pool._pool
.
最后,.imap()
上 Python 文档中的 snippet 进一步激发了我的好奇心:
The chunksize
argument is the same as the one used by the map()
method. For very long iterables using a large value for chunksize
can
make the job complete much faster than using the default value of 1.
有帮助但有点太高级的相关答案:.
我认为您遗漏的部分原因是您天真的估计假设每个工作单元花费相同的时间,在这种情况下您的策略将是最好的。但是,如果某些作业比其他作业完成得早,则某些内核可能会闲置,等待缓慢的作业完成。
因此,通过将块分成 4 倍多的块,然后如果一个块提前完成,该核心可以启动下一个块(而其他核心继续处理较慢的块)。
我不知道他们为什么准确地选择了因子 4,但这是在最小化地图代码的开销(这需要尽可能大的块)和平衡花费不同时间的块(这想要尽可能小的块)。
简答
Pool 的 chunksize-algorithm 是一种启发式方法。它为您尝试填充到 Pool 方法中的所有可想象的问题场景提供了一个简单的解决方案。因此,它无法针对任何 特定 场景进行优化。
该算法将可迭代对象任意划分为比朴素方法多大约四倍的块。更多块意味着更多开销,但增加了调度灵活性。这个答案将如何显示,这导致平均 worker-utilization 更高,但是 没有 保证每个案例的总计算时间更短。
"That's nice to know" 您可能会想,"but how does knowing this help me with my concrete multiprocessing problems?" 好吧,事实并非如此。更诚实的简短回答是 "there is no short answer"、"multiprocessing is complex" 和 "it depends"。观察到的症状可能有不同的根源,即使对于类似的情况也是如此。
本回答试图为您提供基本概念,帮助您更清楚地了解 Pool 的调度黑匣子。它还尝试为您提供一些手头的基本工具,用于识别和避免与块大小相关的潜在悬崖。
Table 的目录
第一部分
- 定义
- 并行化目标
- 并行化场景
- Chunksize > 1 的风险
- 池的Chunksize-Algorithm
量化算法效率
6.1 型号
6.2 并行计划
6.3 效率
6.3.1 绝对分配效率 (ADE)
6.3.2 相对分配效率 (RDE)
- Naive 与 Pool 的 Chunksize-Algorithm
- 现实检查
- 结论
有必要先澄清一些重要的术语。
1。定义
块
此处的块是 pool-method 调用中指定的 iterable
参数的一部分。如何计算 chunksize 以及这会产生什么影响,是这个答案的主题。
任务
任务在worker-process中数据的物理表示如下图所示。
图中显示了对 pool.map()
的示例调用,沿代码行显示,取自 multiprocessing.pool.worker
函数,其中从 inqueue
读取的任务被解包。 worker
是池 MainThread
中的底层 main-function-worker-process。 pool-method 中指定的 func
参数将只匹配 worker
函数内的 func
变量,用于 single-call 方法,如 apply_async
和对于 imap
和 chunksize=1
。对于带有 chunksize
参数的 pool-method 的其余部分,processing-function func
将是 mapper-function(mapstar
或 starmapstar
).此函数将 user-specified func
参数映射到可迭代传输块的每个元素上 (--> "map-tasks")。这花费的时间定义了一个 任务 也作为一个 工作单元 。
任务
虽然单词 "task" 用于一个块的 整个 处理与 multiprocessing.pool
中的代码匹配,但没有迹象表明如何单次调用到user-specifiedfunc
,用一个
块的元素作为参数,应该被引用。为了避免命名冲突引起的混乱(想想 Pool 的 __init__
-方法的 maxtasksperchild
-参数),这个答案将参考
任务中的单个工作单元,如 taskel.
A taskel (from task + element) is the smallest unit of work within a task.
It is the single execution of the function specified with the func
-parameter of a Pool
-method, called with arguments obtained from a single element of the transmitted chunk.
A task consists of chunksize
taskels.
并行化开销 (PO)
PO由Python-internal开销和inter-process通信开销(IPC)组成。 Python 中的 per-task 开销伴随着打包和解包任务及其结果所需的代码。 IPC-overhead 附带了必要的线程同步和不同地址 spaces 之间的数据复制(需要两个复制步骤:parent -> queue -> child). IPC-overhead 的数量取决于 OS、硬件和 data-size,这使得对影响的概括变得困难。
2。并行化目标
使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间。为了达到这个总体目标,我们的技术目标需要优化硬件资源的利用。
实现技术目标的一些重要 sub-goals 是:
首先,任务需要足够的计算量(密集),才能赚回 我们必须为并行化支付的 PO。 PO的相关性随着绝对计算量的增加而降低每个任务的时间。或者,换句话说,你的问题的绝对计算时间 per taskel 越大,减少 PO 的需求就越不相关。如果您的计算每个任务需要数小时,那么 IPC 开销相比之下可以忽略不计。这里主要关注的是防止在分配完所有任务后空闲工作进程。保持所有内核加载意味着我们正在尽可能多地并行化。
3。并行化场景
What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()
问题的主要因素是计算时间在我们的单个任务中可能 变化 多少。简而言之,最佳块大小的选择取决于每个任务的计算时间的变异系数 (CV)。
从这种变化的程度来看,规模上的两种极端情况是:
- 所有任务集需要完全相同的计算时间。
- 一个任务包可能需要几秒或几天才能完成。
为了更好的记忆,我将这些场景称为:
- 密集场景
- 宽场景
密集场景
在密集场景中,最好一次分发所有任务,以将必要的 IPC 和上下文切换保持在最低限度。这意味着我们只想创建与工作进程一样多的块。上面已经说过,PO 的权重随着每个任务的计算时间的缩短而增加。
为了获得最大吞吐量,我们还希望所有工作进程都处于忙碌状态,直到处理完所有任务(没有空闲的工作进程)。为此,分布式块的大小应相等或接近。
宽场景
Wide Scenario 的主要示例是优化问题,其中结果要么快速收敛,要么计算可能需要数小时,甚至数天。通常无法预测在这种情况下任务将包含 "light taskels" 和 "heavy taskels" 的混合,因此不建议一次在 task-batch 中分配太多任务。一次分配尽可能少的任务包,意味着增加调度灵活性。这是实现我们所有内核的高利用率 sub-goal 所必需的。
如果默认情况下 Pool
方法针对密集场景进行了完全优化,它们将越来越多地为靠近宽场景的每个问题创建次优计时。
4。 Chunksize > 1 的风险
考虑这个简化的 pseudo-code 示例 Wide Scenario-iterable,我们希望将其传递给 pool-method:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
我们假装以秒为单位看到所需的计算时间,而不是实际值,为了简单起见,只有 1 分钟或 1 天。
我们假设池有四个工作进程(在四个核心上)并且 chunksize
设置为 2
。因为订单将被保留,发送给工人的块将是这些:
[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
由于我们有足够多的worker并且计算时间足够高,我们可以说,每个worker进程首先都会得到一个chunk来处理。 (这不一定是快速完成任务的情况)。进一步我们可以说,整个处理大约需要 86400+60 秒,因为这是这个人工场景中一个块的最高总计算时间,我们只分发一次块。
现在考虑这个可迭代对象,与前一个可迭代对象相比,它只有一个元素改变了它的位置:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
...以及相应的块:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
不幸的是,我们的可迭代对象的排序使我们的总处理时间几乎翻了一番 (86400+86400)!获得恶性 (86400, 86400)-chunk 的工人正在阻止其任务中的第二个重任务分配给已经完成其 (60, 60)-chunk 的空闲工人之一。如果我们设置 chunksize=1
.
,我们显然不会冒这样不愉快的结果的风险
这是更大块大小的风险。对于更高的块大小,我们以调度灵活性换取更少的开销,在上述情况下,这是一笔糟糕的交易。
我们将在第 6 章中看到。量化算法效率,更大的块大小也可能导致密集场景.
的次优结果
5。游泳池的 Chunksize-Algorithm
您将在下面找到源代码中算法的略微修改版本。如您所见,我将下半部分切掉并将其包装成一个用于在外部计算 chunksize
参数的函数。我还将 4
替换为 factor
参数并将 len()
调用外包。
# mp_utils.py
def calc_chunksize(n_workers, len_iterable, factor=4):
"""Calculate chunksize argument for Pool-methods.
Resembles source-code within `multiprocessing.pool.Pool._map_async`.
"""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
return chunksize
为了确保我们都在同一页面上,下面是 divmod
所做的:
divmod(x, y)
是 returns (x//y, x%y)
的内置函数。x // y
是底除法,返回 x / y
的向下舍入商,而
x % y
是从 x / y
返回余数的模运算。
因此,例如divmod(10, 3)
returns (3, 1)
.
现在当您查看 chunksize, extra = divmod(len_iterable, n_workers * 4)
时,您会注意到 n_workers
这里是 x / y
中的除数 y
并乘以 4
,无需进一步稍后通过 if extra: chunksize +=1
进行调整,导致初始块大小 至少 比其他情况小四倍(对于 len_iterable >= n_workers * 4
)。
要查看乘以 4
对中间块大小结果的影响,请考虑此函数:
def compare_chunksizes(len_iterable, n_workers=4):
"""Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
for Pool's complete algorithm. Return chunksizes and the real factors by
which naive chunksizes are bigger.
"""
cs_naive = len_iterable // n_workers or 1 # naive approach
cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo.
cs_pool2 = calc_chunksize(n_workers, len_iterable)
real_factor_pool1 = cs_naive / cs_pool1
real_factor_pool2 = cs_naive / cs_pool2
return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
上面的函数计算了 Pool 的 chunksize-algorithm (cs_pool1
) 的 naive chunksize (cs_naive
) 和 first-step chunksize,以及完整的 chunksize Pool-algorithm (cs_pool2
)。此外,它还计算了 实际因素 rf_pool1 = cs_naive / cs_pool1
和 rf_pool2 = cs_naive / cs_pool2
,这告诉我们天真计算的块大小更大了多少倍比池的内部版本。
下面您可以看到使用此函数的输出创建的两个图形。左图仅显示 n_workers=4
的块大小,直到 500
的可迭代长度。右图显示了 rf_pool1
的值。对于可迭代长度 16
,实数因子变为 >=4
(对于 len_iterable >= n_workers * 4
),对于可迭代长度 28-31
,其最大值为 7
。这与算法收敛到更长的迭代的原始因素 4
有很大的偏差。 'Longer'这里是相对的,取决于指定worker的数量
记住 chunksize cs_pool1
仍然缺少 extra
的调整,divmod
的余数包含在完整算法的 cs_pool2
中。
算法继续:
if extra:
chunksize += 1
现在如果有 是 余数(divmod-operation 中的 extra
),将 chunksize 增加 1 显然不能解决每个问题任务。毕竟要是这样的话,本来就没有余数了。
如何在下图中看到“extra-treatment”的效果,即 实数因为 rf_pool2
现在从 下方 4
收敛到 4
并且偏差稍微平滑一些。 n_workers=4
和 len_iterable=500
的标准偏差从 rf_pool1
的 0.5233
下降到 rf_pool2
的 0.4115
。
最终,将 chunksize
增加 1 的效果是,最后传输的任务大小仅为 len_iterable % chunksize or chunksize
。
extra-treatment 的效果越有趣,我们稍后会看到的效果越重要,但是可以观察到生成的 数量块 (n_chunks
)。
对于足够长的iterables,Pool的完成chunksize-algorithm(下图中的n_pool2
)会把chunk的数量稳定在n_chunks == n_workers * 4
。
相比之下,朴素算法(在初始打嗝之后)随着可迭代长度的增长在 n_chunks == n_workers
和 n_chunks == n_workers + 1
之间交替。
下面你会发现两个增强的 info-functions 用于 Pool's 和天真的 chunksize-algorithm。下一章会用到这些函数的输出。
# mp_utils.py
from collections import namedtuple
Chunkinfo = namedtuple(
'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
'chunksize', 'last_chunk']
)
def calc_chunksize_info(n_workers, len_iterable, factor=4):
"""Calculate chunksize numbers."""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
# `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
# exploit `0 == False`
last_chunk = len_iterable % chunksize or chunksize
return Chunkinfo(
n_workers, len_iterable, n_chunks, chunksize, last_chunk
)
不要被 calc_naive_chunksize_info
可能出乎意料的外观所迷惑。 divmod
中的 extra
不用于计算块大小。
def calc_naive_chunksize_info(n_workers, len_iterable):
"""Calculate naive chunksize numbers."""
chunksize, extra = divmod(len_iterable, n_workers)
if chunksize == 0:
chunksize = 1
n_chunks = extra
last_chunk = chunksize
else:
n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
last_chunk = len_iterable % chunksize or chunksize
return Chunkinfo(
n_workers, len_iterable, n_chunks, chunksize, last_chunk
)
6.量化算法效率
现在,在我们看到 Pool
的 chunksize-algorithm 的输出与朴素算法的输出相比有何不同之后...
- 如何判断 Pool 的方法是否真的改进 某些东西?
- 这个东西到底是什么?
如前一章所示,对于更长的iterables(更多的taskel),Pool的chunksize-algorithm 大约将iterable分为四次比原始方法多 个块。更小的块意味着更多的任务,更多的任务意味着更多的 并行化开销 (PO),必须权衡增加 scheduling-flexibility 带来的好处的成本(回忆 "Risks of Chunksize>1").
由于相当明显的原因,Pool 的基本 chunksize-algorithm 不能为我们权衡 scheduling-flexibility 与 PO。 IPC-overhead 依赖于 OS、硬件和 data-size。该算法无法知道我们 运行 我们的代码在什么硬件上运行,也不知道任务集需要多长时间才能完成。它是一种启发式算法,可为 所有 可能的场景提供基本功能。这意味着它不能针对任何特定场景进行优化。如前所述,PO 随着每个任务的计算时间增加(负相关)也变得越来越不重要。
当您回忆第 2 章中的 并行化目标 时,一个 bullet-point 是:
- 所有 cpu-cores
的高利用率
前面提到的something,Pool的chunksize-algorithmca 尝试改进 怠速最小化 worker-processes,分别是 利用 cpu-cores。
关于 multiprocessing.Pool
的关于 SO 的重复问题是由那些想知道未使用的核心/闲置 worker-processes 在您期望所有 worker-processes 忙碌的情况下提出的。虽然这可能有很多原因,但在计算结束时空闲 worker-processes 是我们经常可以观察到的,即使在 Dense Scenarios(每个任务的计算时间相等)工作人员数量不是块数 ( 除数 (n_chunks % n_workers > 0
) 的情况。
现在的问题是:
How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?
6.1 型号
为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化为可管理的复杂程度,同时在定义的边界内保持重要性。这种抽象被称为模型。如果要收集数据,这种“并行化模型”(PM) 的实现会像实际计算一样生成 worker-mapped meta-data(时间戳)。 model-generated meta-data 允许在某些约束下预测并行计算的指标。
此处定义的 PM 中的两个 sub-models 之一是 分发模型 (DM)。 DM 解释了原子工作单元(任务)是如何分布在 并行工作人员和时间 上的,除了各自的 chunksize-algorithm,工人的数量,input-iterable(taskel 的数量)和他们的计算持续时间被考虑在内。这意味着任何形式的开销都不包括在内。
为了获得完整的 PM,DM 扩展了 Overhead 模型 (OM),表示各种形式的 并行化开销 (PO)。这样的模型需要针对每个节点单独进行校准(硬件,OS-dependencies)。 OM 表示多少种形式的开销是悬而未决的,因此可以存在具有不同复杂程度的多个 OM。实现的OM 需要哪个精度级别取决于特定计算的PO 的总体权重。较短的任务组导致 PO 的权重更高,如果我们试图 预测 [,这反过来需要更精确的 OM =558=] 并行化效率 (PE).
6.2 并行计划 (PS)
Parallel Schedule是two-dimensional表示并行计算,其中x-axis表示时间,y-axis表示池平行工人。工人数量和总计算时间标记了矩形的延伸,其中绘制了较小的矩形。这些较小的矩形表示原子工作单元(任务)。
您可以在下面找到 PS 的可视化效果,它是使用 Pool chunksize-algorithm 的 DM 中的数据绘制的密集场景.
- x-axis被分成相等的时间单位,其中每个单位代表任务组所需的计算时间。
- y-axis 分为池使用的 worker-processes 数。
- 此处的任务组显示为最小的cyan-colored矩形,放入匿名worker-process.
的时间线(时间表)中
- 一个任务是 worker-timeline 中的一个或多个任务,用相同的色调连续突出显示。
- 空闲时间单位通过红色方块表示。
- 并行计划被分成几个部分。最后一节是 tail-section.
组成部分的名称如下图所示。
在包含 OM 的完整 PM 中,Idling Share 不限于尾部,但也包括 space 任务之间甚至任务集之间。
6.3 效率
上面介绍的模型允许量化 worker-utilization 的速率。我们可以区分:
- 分布效率 (DE) - 在 DM 的帮助下计算(或 Dense 的简化方法场景).
- 并行化效率 (PE) - 要么借助校准的 PM(预测)计算,要么根据 meta-data 计算实际计算。
需要注意的是,计算出的效率 不会 自动与 更快 总体相关给定并行化问题的计算。 Worker-utilization 在此上下文中仅区分具有已开始但未完成任务的工作人员和工作人员没有这样的 "open" 任务组。这意味着,可能在期间任务组的时间跨度未注册。
以上提到的所有效率基本上都是通过计算除法Busy Share / Parallel Schedule的商得到的。 DE 和 PE 的区别在于 Busy Share
在 overhead-extended PM.
的总体并行计划中占据较小部分
这个答案只会进一步讨论一个简单的方法来计算Dense Scenario的DE。这足以比较不同的 chunksize-algorithms,因为...
- ... DM 是 PM 的一部分,随着不同 chunksize-algorithm 的使用而变化。
- ...每个任务组具有相同计算持续时间的密集场景描述了一个"stable state",对于这些时间跨度,这些时间跨度不在等式中。任何其他情况只会导致随机结果,因为任务组的顺序很重要。
6.3.1 绝对分配效率 (ADE)
这个基本效率一般可以通过将Busy Share除以Parallel Schedule的全部潜力来计算:
Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule
对于密集场景,简化的calculation-code看起来像这样:
# mp_utils.py
def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
"""Calculate Absolute Distribution Efficiency (ADE).
`len_iterable` is not used, but contained to keep a consistent signature
with `calc_rde`.
"""
if n_workers == 1:
return 1
potential = (
((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
+ (n_chunks % n_workers == 1) * last_chunk
) * n_workers
n_full_chunks = n_chunks - (chunksize > last_chunk)
taskels_in_regular_chunks = n_full_chunks * chunksize
real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
ade = real / potential
return ade
如果没有Idling Share,Busy Share将等于到并行计划,因此我们得到 100% 的 ADE。在我们的简化模型中,这是一个场景,其中所有可用进程在处理所有任务所需的整个时间内都将处于忙碌状态。换句话说,整个作业得到了 100% 的有效并行化。
但为什么我在这里一直将 PE 称为 absolute PE?
为了理解这一点,我们必须考虑确保最大调度灵活性的块大小 (cs) 的可能情况(还有可能存在的 Highlanders 数量。巧合吗?):
__________________________________~ ONE ~__________________________________
例如,如果我们有四个 worker-processes 和 37 个 taskel,即使 chunksize=1
也会有闲置的 worker,因为 n_workers=4
不是 37 的约数。 37 / 4 的余数为 1。剩下的这个 taskel 必须由 sole worker 处理,而剩下的 3 个闲置。
同样,仍然会有一个idle worker,有39个taskel,如下图所示。
当您将 chunksize=1
的 Parallel Schedule 与 chunksize=3
的以下版本进行比较时,您会注意到 Parallel Schedule更小,时间线上x-axis更短。现在应该变得很明显了,即使对于 Dense Scenarios.
,更大的块大小也会 导致整体计算时间增加
But why not just use the length of the x-axis for efficiency calculations?
因为开销不包含在这个模型中。两个块大小都不同,因此 x-axis 不能直接比较。开销仍然会导致更长的总计算时间,如下图 案例 2 所示。
6.3.2 相对分配效率 (RDE)
ADE 值不包含信息,如果更好 taskels 的分布是可能的,chunksize 设置为 1。Better 这里仍然意味着更小的 Idling Share.
要获得针对最大可能 DE 调整的 DE 值,我们必须将考虑的 ADE 通过 ADE 我们得到 chunksize=1
.
Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1
代码如下所示:
# mp_utils.py
def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
"""Calculate Relative Distribution Efficiency (RDE)."""
ade_cs1 = calc_ade(
n_workers, len_iterable, n_chunks=len_iterable,
chunksize=1, last_chunk=1
)
ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
rde = ade / ade_cs1
return rde
RDE,这里怎么定义,本质上是一个Parallel Schedule尾巴的故事。 RDE 受尾部包含的最大有效块大小的影响。 (这条尾巴可以是 x-axis 长度 chunksize
或 last_chunk
。)
结果是,RDE 对于各种 "tail-looks" 自然收敛到 100%(偶数),如下图所示。
低 RDE ...
- 强烈提示优化潜力。
- 对于较长的迭代器自然变得不太可能,因为总体 并行计划 的相对 tail-portion 缩小。
请查找此答案的第二部分 here。
About this answer
This answer is Part II of the accepted answer above.
7。 Naive vs. Pool 的 Chunksize-Algorithm
在进入细节之前,请考虑下面的两个 gif。对于不同的 iterable
长度范围,它们显示了两个比较算法如何对传递的 iterable
进行分块(到那时它将是一个序列)以及如何分配结果任务。 worker 的顺序是随机的,现实中每个 worker 的分布式任务数量可能与 light taskels 和/或 wide Scenario 中的 taskels 的图像不同。如前所述,这里也不包括开销。但是,对于传输可忽略的密集场景中足够重的任务单元 data-sizes,实际计算得出了非常相似的画面。
如章节“5.Pool的Chunksize-Algorithm”,随着Pool的chunksize-algorithmchunk的数量将稳定在n_chunks == n_workers * 4
足够大的迭代器,同时它使用天真的方法在 n_chunks == n_workers
和 n_chunks == n_workers + 1
之间切换。对于适用的朴素算法:因为 n_chunks % n_workers == 1
对于 n_chunks == n_workers + 1
是 True
,因此将创建一个新部分,其中将只雇用一个工人。
Naive Chunksize-Algorithm:
You might think you created tasks in the same number of workers, but this will only be true for cases where there is no remainder for len_iterable / n_workers
. If there is a remainder, there will be a new section with only one task for a single worker. At that point your computation will not be parallel anymore.
下图与第 5 章中的相似,但显示的是节数而不是块数。对于 Pool 的完整 chunksize-algorithm (n_pool2
),n_sections
将稳定在臭名昭著的硬编码因子 4
。对于朴素算法,n_sections
将在一和二之间交替。
对于 Pool 的 chunksize-algorithm,通过前面提到的 extra-treatment 在 n_chunks = n_workers * 4
处的稳定性阻止了在此处创建新部分并保留Idling Share 仅限于一名工人进行足够长的迭代。不仅如此,该算法还会不断缩小 Idling Share 的相对大小,从而导致 RDE 值趋向于 100%。
例如,"Long enough" 对于 n_workers=4
是 len_iterable=210
。对于等于或大于该值的迭代,Idling Share 将仅限于一名工人,该特征最初由于 chunksize-algorithm 中的 4
乘法而丢失第一名。
朴素的 chunksize-algorithm 也收敛到 100%,但速度较慢。收敛效果完全取决于这样一个事实,即在将有两个部分的情况下,尾部的相对部分会缩小。这条只有一个雇佣工人的尾巴被限制在 x-axis 长度 n_workers - 1
,len_iterable / n_workers
.
的可能最大余数
How do actual RDE values differ for the naive and Pool's chunksize-algorithm?
下面是两个热图,显示了所有可迭代长度高达 5000 的 RDE 值,适用于从 2 到 100 的所有工人数。
color-scale 从 0.5 变为 1 (50%-100%)。对于左侧热图中的朴素算法,您会注意到更多的暗区(较低的 RDE 值)。相比之下,右侧的 Pool chunksize-algorithm 画出了更加阳光的画面。
lower-left 暗角与 upper-right 亮角的对角线渐变,再次显示了所谓的 "long iterable".[=58 对工人数量的依赖性=]
How bad can it get with each algorithm?
Pool 的 chunksize-algorithm 的 RDE 值为 81.25% 是上面指定的工人和可迭代长度范围的最低值:
天真chunksize-algorithm,事情会变得更糟。此处计算出的最低 RDE 为 50.72 %。在这种情况下,几乎一半的计算时间只有一个工人 运行ning!因此,Knights Landing 的骄傲拥有者,请注意。 ;)
8。现实检查
在前面的章节中,我们考虑了纯数学分布问题的简化模型,从 nitty-gritty 细节中剥离,这些细节首先使多处理成为一个棘手的话题。为了更好地理解分布模型 (DM) 单独 可以在多大程度上有助于解释现实中观察到的工人利用率,我们现在将看一下由 real[ 绘制的并行计划=201=] 计算。
设置
下面的图都处理了一个简单的并行执行,cpu-bound dummy-function,它被调用了各种参数,因此我们可以观察绘制的并行计划如何根据输入值而变化.此函数中的 "work" 仅包含范围 object 上的迭代。这已经足以让核心保持忙碌,因为我们传入了大量数据。可选地,该函数需要一些 taskel-unique 额外的 data
,它只是返回不变。由于每个任务组都包含完全相同的工作量,因此我们仍然在这里处理密集场景。
该函数用包装器装饰,包装器采用 ns-resolution 的时间戳(Python 3.7+)。时间戳用于计算任务组的时间跨度,因此可以绘制经验并行计划。
@stamp_taskel
def busy_foo(i, it, data=None):
"""Dummy function for CPU-bound work."""
for _ in range(int(it)):
pass
return i, data
def stamp_taskel(func):
"""Decorator for taking timestamps on start and end of decorated
function execution.
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time_ns()
result = func(*args, **kwargs)
end_time = time_ns()
return (current_process().name, (start_time, end_time)), result
return wrapper
Pool 的 starmap 方法也被装饰成只有 starmap-call 本身是定时的。此 c 的 "Start" 和 "end"我将确定生成的并行计划的 x-axis 的最小值和最大值。
我们将在具有以下规格的机器上的四个工作进程上观察 40 个任务任务的计算:
Python 3.7.1,Ubuntu 18.04.2,英特尔® 酷睿™ i7-2600K CPU @ 3.40GHz × 8
将变化的输入值是 for-loop 中的迭代次数
(30k,30M,600M)和额外发送的数据大小(每个任务,numpy-ndarray:0 MiB,50 MiB)。
...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3 # 30e6, 600e6
DATA_MiB = 0 # 50
iterable = [
# extra created data per taskel
(i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8))) # taskel args
for i in range(LEN_ITERABLE)
]
with Pool(N_WORKERS) as pool:
results = pool.starmap(busy_foo, iterable)
下面显示的 运行s 是精心挑选的,具有相同的块顺序,因此与分布模型中的并行计划相比,您可以更好地发现差异,但不要忘记工人得到他们的任务是 non-deterministic.
DM预测
重申一下,分配模型 "predicts" 一个并行计划,就像我们之前在第 6.2 章中看到的那样:
第一个 运行:30k 次迭代和每个任务 0 MiB 数据
我们这里的第一个运行很短,taskel很"light"。整个 pool.starmap()
-调用总共只用了 14.5 毫秒。
您会注意到,与 DM 不同,空转不仅限于 tail-section,而且还发生在任务之间甚至任务组之间。那是因为我们这里的实际时间表自然包括各种开销。在这里闲置意味着任务组 之外 的一切。可能真实空闲在期间任务组未被捕获,如前所述。
您还可以看到,并非所有工作人员都同时完成任务。这是因为所有工作人员都通过共享的 inqueue
获取数据,并且一次只有一名工作人员可以从中读取数据。这同样适用于 outqueue
。一旦您传输 non-marginal 大小的数据,我们稍后会看到,这可能会导致更大的混乱。
此外,您还可以看到,尽管每个任务组都包含相同数量的工作,但任务组的实际测量时间跨度差异很大。分配给 worker-3 和 worker-4 的任务比前两个 worker 处理的任务需要更多的时间。对于这个 运行,我怀疑这是由于 turbo boost 在那一刻在核心上不再可用 worker-3/4,所以他们用较低的 clock-rate 处理他们的任务。
整个计算是如此之轻,以至于硬件或 OS-introduced chaos-factors 可以大大扭曲 PS。计算是 "leaf on the wind" 并且 DM 预测意义不大,即使对于理论上合适的场景也是如此。
第 2 运行:每个任务 30M 次迭代和 0 MiB 数据
将 for-loop 中的迭代次数从 30,000 次增加到 3000 万次,导致真正的并行计划与 提供的数据预测的接近完美匹配DM,欢呼!每个任务单元的计算现在已经足够繁重,可以边缘化开始和中间的空闲部分,只让 DM 预测的大空闲部分可见。
第 3 运行:每个任务 30M 次迭代和 50 MiB 数据
保持 30M 次迭代,但另外为每个任务组来回发送 50 MiB 再次扭曲了图片。这里 queueing-effect 很明显。与 Worker-1 相比,Worker-4 需要等待更长的时间才能完成第二个任务。现在想象这个有 70 名工人的时间表!
如果任务集在计算上非常轻,但可以提供大量数据作为有效负载,则单个共享 queue 的瓶颈可以阻止向池中添加更多工作人员的任何额外好处,即使它们由物理核心支持。在这种情况下,Worker-1 可以完成其第一个任务并等待新任务,甚至在 Worker-40 完成其第一个任务之前。
现在应该很明显了,为什么 Pool
中的计算时间并不总是随着工作人员的数量线性减少。沿着 发送相对大量的数据可能会 导致大部分时间花在等待数据被复制到一个工人的地址 space 而只有一个工人可以一次喂饱
第 4 运行:每个任务 6 亿次迭代和 50 MiB 数据
这里我们再次发送 50 MiB,但将迭代次数从 30M 增加到 600M,这使总计算时间从 10 秒增加到 152 秒。绘制的Parallel Schedule再次,与预测的接近完美匹配,通过数据复制的开销被边缘化。
9.结论
所讨论的乘以 4
增加了调度灵活性,但也利用了 taskel-distributions 中的不均匀性。如果没有这种乘法,空闲共享将被限制为单个工作人员,即使对于短迭代(对于具有密集场景的 DMo)。池的 chunksize-algorithm 需要 input-iterables 达到一定的大小才能重新获得该特征。
正如这个答案所希望显示的那样,与原始方法相比,Pool 的 chunksize-algorithm 平均而言可以带来更好的核心利用率,至少对于一般情况而言是这样,并且不考虑长开销。这里的朴素算法的分布效率 (DE) 可以低至 ~51%,而 Pool 的块大小算法则低至 ~81%。 DE 但是不像 IPC 那样包含并行化开销 (PO)。第 8 章已经表明,DE 对于边缘化开销的密集场景仍然具有强大的预测能力。
尽管 Pool 的 chunksize-algorithm 与朴素方法相比实现了更高的 DE,它并没有为每个输入提供最佳任务分配constellation. 虽然一个简单的静态 chunking-algorithm 不能优化 (overhead-including) 并行化效率 (PE),但没有内在原因为什么它不能 always 提供 100% 的相对分配效率 (RDE),这意味着与 chunksize=1
相同的 DE。一个简单的 chunksize-algorithm 仅包含基础数学,并且可以以任何方式自由 "slice the cake"。
与 Pool 实现的 "equal-size-chunking" 算法不同,"even-size-chunking" 算法将为每个 len_iterable
/ [ 提供 100% 的 RDE =39=]组合。 even-size-chunking 算法在 Pool 的源代码中实现起来会稍微复杂一些,但可以通过在外部打包任务来在现有算法之上进行调制(我将从这里 link 以防我丢掉一个Q/A 如何做到这一点)。
哪些因素决定了 chunksize
方法的最佳参数,例如 multiprocessing.Pool.map()
? .map()
方法似乎对其默认块大小使用了任意启发式方法(如下所述);是什么激发了这种选择,是否有基于某些特定 situation/setup 的更周到的方法?
示例 - 假设我是:
- 将一个
iterable
传递给具有约 1500 万个元素的.map()
; - 在具有 24 个内核的机器上工作并在
multiprocessing.Pool()
内使用默认值processes = os.cpu_count()
。
我天真的想法是给 24 名工人中的每人一个同样大小的块,即 15_000_000 / 24
或 625,000。大块应该减少 turnover/overhead,同时充分利用所有工人。但这似乎忽略了向每个工人提供大批量的一些潜在缺点。这是一张不完整的图片吗,我错过了什么?
我的部分问题源于 if chunksize=None
的默认逻辑:.map()
和 .starmap()
都调用 .map_async()
,如下所示:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
# ... (materialize `iterable` to list if it's an iterator)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4) # ????
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
divmod(len(iterable), len(self._pool) * 4)
背后的逻辑是什么?这意味着块大小将更接近 15_000_000 / (24 * 4) == 156_250
。将 len(self._pool)
乘以 4 的意图是什么?
这使得生成的块大小比我上面的 "naive logic" 小 4 倍,这包括将可迭代的长度除以工人的数量pool._pool
.
最后,.imap()
上 Python 文档中的 snippet 进一步激发了我的好奇心:
The
chunksize
argument is the same as the one used by themap()
method. For very long iterables using a large value forchunksize
can make the job complete much faster than using the default value of 1.
有帮助但有点太高级的相关答案:
我认为您遗漏的部分原因是您天真的估计假设每个工作单元花费相同的时间,在这种情况下您的策略将是最好的。但是,如果某些作业比其他作业完成得早,则某些内核可能会闲置,等待缓慢的作业完成。
因此,通过将块分成 4 倍多的块,然后如果一个块提前完成,该核心可以启动下一个块(而其他核心继续处理较慢的块)。
我不知道他们为什么准确地选择了因子 4,但这是在最小化地图代码的开销(这需要尽可能大的块)和平衡花费不同时间的块(这想要尽可能小的块)。
简答
Pool 的 chunksize-algorithm 是一种启发式方法。它为您尝试填充到 Pool 方法中的所有可想象的问题场景提供了一个简单的解决方案。因此,它无法针对任何 特定 场景进行优化。
该算法将可迭代对象任意划分为比朴素方法多大约四倍的块。更多块意味着更多开销,但增加了调度灵活性。这个答案将如何显示,这导致平均 worker-utilization 更高,但是 没有 保证每个案例的总计算时间更短。
"That's nice to know" 您可能会想,"but how does knowing this help me with my concrete multiprocessing problems?" 好吧,事实并非如此。更诚实的简短回答是 "there is no short answer"、"multiprocessing is complex" 和 "it depends"。观察到的症状可能有不同的根源,即使对于类似的情况也是如此。
本回答试图为您提供基本概念,帮助您更清楚地了解 Pool 的调度黑匣子。它还尝试为您提供一些手头的基本工具,用于识别和避免与块大小相关的潜在悬崖。
Table 的目录
第一部分
- 定义
- 并行化目标
- 并行化场景
- Chunksize > 1 的风险
- 池的Chunksize-Algorithm
量化算法效率
6.1 型号
6.2 并行计划
6.3 效率
6.3.1 绝对分配效率 (ADE)
6.3.2 相对分配效率 (RDE)
- Naive 与 Pool 的 Chunksize-Algorithm
- 现实检查
- 结论
有必要先澄清一些重要的术语。
1。定义
块
此处的块是 pool-method 调用中指定的 iterable
参数的一部分。如何计算 chunksize 以及这会产生什么影响,是这个答案的主题。
任务
任务在worker-process中数据的物理表示如下图所示。
图中显示了对 pool.map()
的示例调用,沿代码行显示,取自 multiprocessing.pool.worker
函数,其中从 inqueue
读取的任务被解包。 worker
是池 MainThread
中的底层 main-function-worker-process。 pool-method 中指定的 func
参数将只匹配 worker
函数内的 func
变量,用于 single-call 方法,如 apply_async
和对于 imap
和 chunksize=1
。对于带有 chunksize
参数的 pool-method 的其余部分,processing-function func
将是 mapper-function(mapstar
或 starmapstar
).此函数将 user-specified func
参数映射到可迭代传输块的每个元素上 (--> "map-tasks")。这花费的时间定义了一个 任务 也作为一个 工作单元 。
任务
虽然单词 "task" 用于一个块的 整个 处理与 multiprocessing.pool
中的代码匹配,但没有迹象表明如何单次调用到user-specifiedfunc
,用一个
块的元素作为参数,应该被引用。为了避免命名冲突引起的混乱(想想 Pool 的 __init__
-方法的 maxtasksperchild
-参数),这个答案将参考
任务中的单个工作单元,如 taskel.
A taskel (from task + element) is the smallest unit of work within a task. It is the single execution of the function specified with the
func
-parameter of aPool
-method, called with arguments obtained from a single element of the transmitted chunk. A task consists ofchunksize
taskels.
并行化开销 (PO)
PO由Python-internal开销和inter-process通信开销(IPC)组成。 Python 中的 per-task 开销伴随着打包和解包任务及其结果所需的代码。 IPC-overhead 附带了必要的线程同步和不同地址 spaces 之间的数据复制(需要两个复制步骤:parent -> queue -> child). IPC-overhead 的数量取决于 OS、硬件和 data-size,这使得对影响的概括变得困难。
2。并行化目标
使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间。为了达到这个总体目标,我们的技术目标需要优化硬件资源的利用。
实现技术目标的一些重要 sub-goals 是:
首先,任务需要足够的计算量(密集),才能赚回 我们必须为并行化支付的 PO。 PO的相关性随着绝对计算量的增加而降低每个任务的时间。或者,换句话说,你的问题的绝对计算时间 per taskel 越大,减少 PO 的需求就越不相关。如果您的计算每个任务需要数小时,那么 IPC 开销相比之下可以忽略不计。这里主要关注的是防止在分配完所有任务后空闲工作进程。保持所有内核加载意味着我们正在尽可能多地并行化。
3。并行化场景
What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()
问题的主要因素是计算时间在我们的单个任务中可能 变化 多少。简而言之,最佳块大小的选择取决于每个任务的计算时间的变异系数 (CV)。
从这种变化的程度来看,规模上的两种极端情况是:
- 所有任务集需要完全相同的计算时间。
- 一个任务包可能需要几秒或几天才能完成。
为了更好的记忆,我将这些场景称为:
- 密集场景
- 宽场景
密集场景
在密集场景中,最好一次分发所有任务,以将必要的 IPC 和上下文切换保持在最低限度。这意味着我们只想创建与工作进程一样多的块。上面已经说过,PO 的权重随着每个任务的计算时间的缩短而增加。
为了获得最大吞吐量,我们还希望所有工作进程都处于忙碌状态,直到处理完所有任务(没有空闲的工作进程)。为此,分布式块的大小应相等或接近。
宽场景
Wide Scenario 的主要示例是优化问题,其中结果要么快速收敛,要么计算可能需要数小时,甚至数天。通常无法预测在这种情况下任务将包含 "light taskels" 和 "heavy taskels" 的混合,因此不建议一次在 task-batch 中分配太多任务。一次分配尽可能少的任务包,意味着增加调度灵活性。这是实现我们所有内核的高利用率 sub-goal 所必需的。
如果默认情况下 Pool
方法针对密集场景进行了完全优化,它们将越来越多地为靠近宽场景的每个问题创建次优计时。
4。 Chunksize > 1 的风险
考虑这个简化的 pseudo-code 示例 Wide Scenario-iterable,我们希望将其传递给 pool-method:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
我们假装以秒为单位看到所需的计算时间,而不是实际值,为了简单起见,只有 1 分钟或 1 天。
我们假设池有四个工作进程(在四个核心上)并且 chunksize
设置为 2
。因为订单将被保留,发送给工人的块将是这些:
[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
由于我们有足够多的worker并且计算时间足够高,我们可以说,每个worker进程首先都会得到一个chunk来处理。 (这不一定是快速完成任务的情况)。进一步我们可以说,整个处理大约需要 86400+60 秒,因为这是这个人工场景中一个块的最高总计算时间,我们只分发一次块。
现在考虑这个可迭代对象,与前一个可迭代对象相比,它只有一个元素改变了它的位置:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
...以及相应的块:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
不幸的是,我们的可迭代对象的排序使我们的总处理时间几乎翻了一番 (86400+86400)!获得恶性 (86400, 86400)-chunk 的工人正在阻止其任务中的第二个重任务分配给已经完成其 (60, 60)-chunk 的空闲工人之一。如果我们设置 chunksize=1
.
这是更大块大小的风险。对于更高的块大小,我们以调度灵活性换取更少的开销,在上述情况下,这是一笔糟糕的交易。
我们将在第 6 章中看到。量化算法效率,更大的块大小也可能导致密集场景.
的次优结果5。游泳池的 Chunksize-Algorithm
您将在下面找到源代码中算法的略微修改版本。如您所见,我将下半部分切掉并将其包装成一个用于在外部计算 chunksize
参数的函数。我还将 4
替换为 factor
参数并将 len()
调用外包。
# mp_utils.py
def calc_chunksize(n_workers, len_iterable, factor=4):
"""Calculate chunksize argument for Pool-methods.
Resembles source-code within `multiprocessing.pool.Pool._map_async`.
"""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
return chunksize
为了确保我们都在同一页面上,下面是 divmod
所做的:
divmod(x, y)
是 returns (x//y, x%y)
的内置函数。x // y
是底除法,返回 x / y
的向下舍入商,而
x % y
是从 x / y
返回余数的模运算。
因此,例如divmod(10, 3)
returns (3, 1)
.
现在当您查看 chunksize, extra = divmod(len_iterable, n_workers * 4)
时,您会注意到 n_workers
这里是 x / y
中的除数 y
并乘以 4
,无需进一步稍后通过 if extra: chunksize +=1
进行调整,导致初始块大小 至少 比其他情况小四倍(对于 len_iterable >= n_workers * 4
)。
要查看乘以 4
对中间块大小结果的影响,请考虑此函数:
def compare_chunksizes(len_iterable, n_workers=4):
"""Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
for Pool's complete algorithm. Return chunksizes and the real factors by
which naive chunksizes are bigger.
"""
cs_naive = len_iterable // n_workers or 1 # naive approach
cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo.
cs_pool2 = calc_chunksize(n_workers, len_iterable)
real_factor_pool1 = cs_naive / cs_pool1
real_factor_pool2 = cs_naive / cs_pool2
return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
上面的函数计算了 Pool 的 chunksize-algorithm (cs_pool1
) 的 naive chunksize (cs_naive
) 和 first-step chunksize,以及完整的 chunksize Pool-algorithm (cs_pool2
)。此外,它还计算了 实际因素 rf_pool1 = cs_naive / cs_pool1
和 rf_pool2 = cs_naive / cs_pool2
,这告诉我们天真计算的块大小更大了多少倍比池的内部版本。
下面您可以看到使用此函数的输出创建的两个图形。左图仅显示 n_workers=4
的块大小,直到 500
的可迭代长度。右图显示了 rf_pool1
的值。对于可迭代长度 16
,实数因子变为 >=4
(对于 len_iterable >= n_workers * 4
),对于可迭代长度 28-31
,其最大值为 7
。这与算法收敛到更长的迭代的原始因素 4
有很大的偏差。 'Longer'这里是相对的,取决于指定worker的数量
记住 chunksize cs_pool1
仍然缺少 extra
的调整,divmod
的余数包含在完整算法的 cs_pool2
中。
算法继续:
if extra:
chunksize += 1
现在如果有 是 余数(divmod-operation 中的 extra
),将 chunksize 增加 1 显然不能解决每个问题任务。毕竟要是这样的话,本来就没有余数了。
如何在下图中看到“extra-treatment”的效果,即 实数因为 rf_pool2
现在从 下方 4
收敛到 4
并且偏差稍微平滑一些。 n_workers=4
和 len_iterable=500
的标准偏差从 rf_pool1
的 0.5233
下降到 rf_pool2
的 0.4115
。
最终,将 chunksize
增加 1 的效果是,最后传输的任务大小仅为 len_iterable % chunksize or chunksize
。
extra-treatment 的效果越有趣,我们稍后会看到的效果越重要,但是可以观察到生成的 数量块 (n_chunks
)。
对于足够长的iterables,Pool的完成chunksize-algorithm(下图中的n_pool2
)会把chunk的数量稳定在n_chunks == n_workers * 4
。
相比之下,朴素算法(在初始打嗝之后)随着可迭代长度的增长在 n_chunks == n_workers
和 n_chunks == n_workers + 1
之间交替。
下面你会发现两个增强的 info-functions 用于 Pool's 和天真的 chunksize-algorithm。下一章会用到这些函数的输出。
# mp_utils.py
from collections import namedtuple
Chunkinfo = namedtuple(
'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
'chunksize', 'last_chunk']
)
def calc_chunksize_info(n_workers, len_iterable, factor=4):
"""Calculate chunksize numbers."""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
# `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
# exploit `0 == False`
last_chunk = len_iterable % chunksize or chunksize
return Chunkinfo(
n_workers, len_iterable, n_chunks, chunksize, last_chunk
)
不要被 calc_naive_chunksize_info
可能出乎意料的外观所迷惑。 divmod
中的 extra
不用于计算块大小。
def calc_naive_chunksize_info(n_workers, len_iterable):
"""Calculate naive chunksize numbers."""
chunksize, extra = divmod(len_iterable, n_workers)
if chunksize == 0:
chunksize = 1
n_chunks = extra
last_chunk = chunksize
else:
n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
last_chunk = len_iterable % chunksize or chunksize
return Chunkinfo(
n_workers, len_iterable, n_chunks, chunksize, last_chunk
)
6.量化算法效率
现在,在我们看到 Pool
的 chunksize-algorithm 的输出与朴素算法的输出相比有何不同之后...
- 如何判断 Pool 的方法是否真的改进 某些东西?
- 这个东西到底是什么?
如前一章所示,对于更长的iterables(更多的taskel),Pool的chunksize-algorithm 大约将iterable分为四次比原始方法多 个块。更小的块意味着更多的任务,更多的任务意味着更多的 并行化开销 (PO),必须权衡增加 scheduling-flexibility 带来的好处的成本(回忆 "Risks of Chunksize>1").
由于相当明显的原因,Pool 的基本 chunksize-algorithm 不能为我们权衡 scheduling-flexibility 与 PO。 IPC-overhead 依赖于 OS、硬件和 data-size。该算法无法知道我们 运行 我们的代码在什么硬件上运行,也不知道任务集需要多长时间才能完成。它是一种启发式算法,可为 所有 可能的场景提供基本功能。这意味着它不能针对任何特定场景进行优化。如前所述,PO 随着每个任务的计算时间增加(负相关)也变得越来越不重要。
当您回忆第 2 章中的 并行化目标 时,一个 bullet-point 是:
- 所有 cpu-cores 的高利用率
前面提到的something,Pool的chunksize-algorithmca 尝试改进 怠速最小化 worker-processes,分别是 利用 cpu-cores。
关于 multiprocessing.Pool
的关于 SO 的重复问题是由那些想知道未使用的核心/闲置 worker-processes 在您期望所有 worker-processes 忙碌的情况下提出的。虽然这可能有很多原因,但在计算结束时空闲 worker-processes 是我们经常可以观察到的,即使在 Dense Scenarios(每个任务的计算时间相等)工作人员数量不是块数 ( 除数 (n_chunks % n_workers > 0
) 的情况。
现在的问题是:
How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?
6.1 型号
为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化为可管理的复杂程度,同时在定义的边界内保持重要性。这种抽象被称为模型。如果要收集数据,这种“并行化模型”(PM) 的实现会像实际计算一样生成 worker-mapped meta-data(时间戳)。 model-generated meta-data 允许在某些约束下预测并行计算的指标。
此处定义的 PM 中的两个 sub-models 之一是 分发模型 (DM)。 DM 解释了原子工作单元(任务)是如何分布在 并行工作人员和时间 上的,除了各自的 chunksize-algorithm,工人的数量,input-iterable(taskel 的数量)和他们的计算持续时间被考虑在内。这意味着任何形式的开销都不包括在内。
为了获得完整的 PM,DM 扩展了 Overhead 模型 (OM),表示各种形式的 并行化开销 (PO)。这样的模型需要针对每个节点单独进行校准(硬件,OS-dependencies)。 OM 表示多少种形式的开销是悬而未决的,因此可以存在具有不同复杂程度的多个 OM。实现的OM 需要哪个精度级别取决于特定计算的PO 的总体权重。较短的任务组导致 PO 的权重更高,如果我们试图 预测 [,这反过来需要更精确的 OM =558=] 并行化效率 (PE).
6.2 并行计划 (PS)
Parallel Schedule是two-dimensional表示并行计算,其中x-axis表示时间,y-axis表示池平行工人。工人数量和总计算时间标记了矩形的延伸,其中绘制了较小的矩形。这些较小的矩形表示原子工作单元(任务)。
您可以在下面找到 PS 的可视化效果,它是使用 Pool chunksize-algorithm 的 DM 中的数据绘制的密集场景.
- x-axis被分成相等的时间单位,其中每个单位代表任务组所需的计算时间。
- y-axis 分为池使用的 worker-processes 数。
- 此处的任务组显示为最小的cyan-colored矩形,放入匿名worker-process. 的时间线(时间表)中
- 一个任务是 worker-timeline 中的一个或多个任务,用相同的色调连续突出显示。
- 空闲时间单位通过红色方块表示。
- 并行计划被分成几个部分。最后一节是 tail-section.
组成部分的名称如下图所示。
在包含 OM 的完整 PM 中,Idling Share 不限于尾部,但也包括 space 任务之间甚至任务集之间。
6.3 效率
上面介绍的模型允许量化 worker-utilization 的速率。我们可以区分:
- 分布效率 (DE) - 在 DM 的帮助下计算(或 Dense 的简化方法场景).
- 并行化效率 (PE) - 要么借助校准的 PM(预测)计算,要么根据 meta-data 计算实际计算。
需要注意的是,计算出的效率 不会 自动与 更快 总体相关给定并行化问题的计算。 Worker-utilization 在此上下文中仅区分具有已开始但未完成任务的工作人员和工作人员没有这样的 "open" 任务组。这意味着,可能在期间任务组的时间跨度未注册。
以上提到的所有效率基本上都是通过计算除法Busy Share / Parallel Schedule的商得到的。 DE 和 PE 的区别在于 Busy Share 在 overhead-extended PM.
的总体并行计划中占据较小部分这个答案只会进一步讨论一个简单的方法来计算Dense Scenario的DE。这足以比较不同的 chunksize-algorithms,因为...
- ... DM 是 PM 的一部分,随着不同 chunksize-algorithm 的使用而变化。
- ...每个任务组具有相同计算持续时间的密集场景描述了一个"stable state",对于这些时间跨度,这些时间跨度不在等式中。任何其他情况只会导致随机结果,因为任务组的顺序很重要。
6.3.1 绝对分配效率 (ADE)
这个基本效率一般可以通过将Busy Share除以Parallel Schedule的全部潜力来计算:
Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule
对于密集场景,简化的calculation-code看起来像这样:
# mp_utils.py
def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
"""Calculate Absolute Distribution Efficiency (ADE).
`len_iterable` is not used, but contained to keep a consistent signature
with `calc_rde`.
"""
if n_workers == 1:
return 1
potential = (
((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
+ (n_chunks % n_workers == 1) * last_chunk
) * n_workers
n_full_chunks = n_chunks - (chunksize > last_chunk)
taskels_in_regular_chunks = n_full_chunks * chunksize
real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
ade = real / potential
return ade
如果没有Idling Share,Busy Share将等于到并行计划,因此我们得到 100% 的 ADE。在我们的简化模型中,这是一个场景,其中所有可用进程在处理所有任务所需的整个时间内都将处于忙碌状态。换句话说,整个作业得到了 100% 的有效并行化。
但为什么我在这里一直将 PE 称为 absolute PE?
为了理解这一点,我们必须考虑确保最大调度灵活性的块大小 (cs) 的可能情况(还有可能存在的 Highlanders 数量。巧合吗?):
__________________________________~ ONE ~__________________________________
例如,如果我们有四个 worker-processes 和 37 个 taskel,即使 chunksize=1
也会有闲置的 worker,因为 n_workers=4
不是 37 的约数。 37 / 4 的余数为 1。剩下的这个 taskel 必须由 sole worker 处理,而剩下的 3 个闲置。
同样,仍然会有一个idle worker,有39个taskel,如下图所示。
当您将 chunksize=1
的 Parallel Schedule 与 chunksize=3
的以下版本进行比较时,您会注意到 Parallel Schedule更小,时间线上x-axis更短。现在应该变得很明显了,即使对于 Dense Scenarios.
But why not just use the length of the x-axis for efficiency calculations?
因为开销不包含在这个模型中。两个块大小都不同,因此 x-axis 不能直接比较。开销仍然会导致更长的总计算时间,如下图 案例 2 所示。
6.3.2 相对分配效率 (RDE)
ADE 值不包含信息,如果更好 taskels 的分布是可能的,chunksize 设置为 1。Better 这里仍然意味着更小的 Idling Share.
要获得针对最大可能 DE 调整的 DE 值,我们必须将考虑的 ADE 通过 ADE 我们得到 chunksize=1
.
Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1
代码如下所示:
# mp_utils.py
def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
"""Calculate Relative Distribution Efficiency (RDE)."""
ade_cs1 = calc_ade(
n_workers, len_iterable, n_chunks=len_iterable,
chunksize=1, last_chunk=1
)
ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
rde = ade / ade_cs1
return rde
RDE,这里怎么定义,本质上是一个Parallel Schedule尾巴的故事。 RDE 受尾部包含的最大有效块大小的影响。 (这条尾巴可以是 x-axis 长度 chunksize
或 last_chunk
。)
结果是,RDE 对于各种 "tail-looks" 自然收敛到 100%(偶数),如下图所示。
低 RDE ...
- 强烈提示优化潜力。
- 对于较长的迭代器自然变得不太可能,因为总体 并行计划 的相对 tail-portion 缩小。
请查找此答案的第二部分 here。
About this answer
This answer is Part II of the accepted answer above.
7。 Naive vs. Pool 的 Chunksize-Algorithm
在进入细节之前,请考虑下面的两个 gif。对于不同的 iterable
长度范围,它们显示了两个比较算法如何对传递的 iterable
进行分块(到那时它将是一个序列)以及如何分配结果任务。 worker 的顺序是随机的,现实中每个 worker 的分布式任务数量可能与 light taskels 和/或 wide Scenario 中的 taskels 的图像不同。如前所述,这里也不包括开销。但是,对于传输可忽略的密集场景中足够重的任务单元 data-sizes,实际计算得出了非常相似的画面。
如章节“5.Pool的Chunksize-Algorithm”,随着Pool的chunksize-algorithmchunk的数量将稳定在n_chunks == n_workers * 4
足够大的迭代器,同时它使用天真的方法在 n_chunks == n_workers
和 n_chunks == n_workers + 1
之间切换。对于适用的朴素算法:因为 n_chunks % n_workers == 1
对于 n_chunks == n_workers + 1
是 True
,因此将创建一个新部分,其中将只雇用一个工人。
Naive Chunksize-Algorithm:
You might think you created tasks in the same number of workers, but this will only be true for cases where there is no remainder for
len_iterable / n_workers
. If there is a remainder, there will be a new section with only one task for a single worker. At that point your computation will not be parallel anymore.
下图与第 5 章中的相似,但显示的是节数而不是块数。对于 Pool 的完整 chunksize-algorithm (n_pool2
),n_sections
将稳定在臭名昭著的硬编码因子 4
。对于朴素算法,n_sections
将在一和二之间交替。
对于 Pool 的 chunksize-algorithm,通过前面提到的 extra-treatment 在 n_chunks = n_workers * 4
处的稳定性阻止了在此处创建新部分并保留Idling Share 仅限于一名工人进行足够长的迭代。不仅如此,该算法还会不断缩小 Idling Share 的相对大小,从而导致 RDE 值趋向于 100%。
"Long enough" 对于 n_workers=4
是 len_iterable=210
。对于等于或大于该值的迭代,Idling Share 将仅限于一名工人,该特征最初由于 chunksize-algorithm 中的 4
乘法而丢失第一名。
朴素的 chunksize-algorithm 也收敛到 100%,但速度较慢。收敛效果完全取决于这样一个事实,即在将有两个部分的情况下,尾部的相对部分会缩小。这条只有一个雇佣工人的尾巴被限制在 x-axis 长度 n_workers - 1
,len_iterable / n_workers
.
How do actual RDE values differ for the naive and Pool's chunksize-algorithm?
下面是两个热图,显示了所有可迭代长度高达 5000 的 RDE 值,适用于从 2 到 100 的所有工人数。 color-scale 从 0.5 变为 1 (50%-100%)。对于左侧热图中的朴素算法,您会注意到更多的暗区(较低的 RDE 值)。相比之下,右侧的 Pool chunksize-algorithm 画出了更加阳光的画面。
lower-left 暗角与 upper-right 亮角的对角线渐变,再次显示了所谓的 "long iterable".[=58 对工人数量的依赖性=]
How bad can it get with each algorithm?
Pool 的 chunksize-algorithm 的 RDE 值为 81.25% 是上面指定的工人和可迭代长度范围的最低值:
天真chunksize-algorithm,事情会变得更糟。此处计算出的最低 RDE 为 50.72 %。在这种情况下,几乎一半的计算时间只有一个工人 运行ning!因此,Knights Landing 的骄傲拥有者,请注意。 ;)
8。现实检查
在前面的章节中,我们考虑了纯数学分布问题的简化模型,从 nitty-gritty 细节中剥离,这些细节首先使多处理成为一个棘手的话题。为了更好地理解分布模型 (DM) 单独 可以在多大程度上有助于解释现实中观察到的工人利用率,我们现在将看一下由 real[ 绘制的并行计划=201=] 计算。
设置
下面的图都处理了一个简单的并行执行,cpu-bound dummy-function,它被调用了各种参数,因此我们可以观察绘制的并行计划如何根据输入值而变化.此函数中的 "work" 仅包含范围 object 上的迭代。这已经足以让核心保持忙碌,因为我们传入了大量数据。可选地,该函数需要一些 taskel-unique 额外的 data
,它只是返回不变。由于每个任务组都包含完全相同的工作量,因此我们仍然在这里处理密集场景。
该函数用包装器装饰,包装器采用 ns-resolution 的时间戳(Python 3.7+)。时间戳用于计算任务组的时间跨度,因此可以绘制经验并行计划。
@stamp_taskel
def busy_foo(i, it, data=None):
"""Dummy function for CPU-bound work."""
for _ in range(int(it)):
pass
return i, data
def stamp_taskel(func):
"""Decorator for taking timestamps on start and end of decorated
function execution.
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time_ns()
result = func(*args, **kwargs)
end_time = time_ns()
return (current_process().name, (start_time, end_time)), result
return wrapper
Pool 的 starmap 方法也被装饰成只有 starmap-call 本身是定时的。此 c 的 "Start" 和 "end"我将确定生成的并行计划的 x-axis 的最小值和最大值。
我们将在具有以下规格的机器上的四个工作进程上观察 40 个任务任务的计算: Python 3.7.1,Ubuntu 18.04.2,英特尔® 酷睿™ i7-2600K CPU @ 3.40GHz × 8
将变化的输入值是 for-loop 中的迭代次数 (30k,30M,600M)和额外发送的数据大小(每个任务,numpy-ndarray:0 MiB,50 MiB)。
...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3 # 30e6, 600e6
DATA_MiB = 0 # 50
iterable = [
# extra created data per taskel
(i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8))) # taskel args
for i in range(LEN_ITERABLE)
]
with Pool(N_WORKERS) as pool:
results = pool.starmap(busy_foo, iterable)
下面显示的 运行s 是精心挑选的,具有相同的块顺序,因此与分布模型中的并行计划相比,您可以更好地发现差异,但不要忘记工人得到他们的任务是 non-deterministic.
DM预测
重申一下,分配模型 "predicts" 一个并行计划,就像我们之前在第 6.2 章中看到的那样:
第一个 运行:30k 次迭代和每个任务 0 MiB 数据
我们这里的第一个运行很短,taskel很"light"。整个 pool.starmap()
-调用总共只用了 14.5 毫秒。
您会注意到,与 DM 不同,空转不仅限于 tail-section,而且还发生在任务之间甚至任务组之间。那是因为我们这里的实际时间表自然包括各种开销。在这里闲置意味着任务组 之外 的一切。可能真实空闲在期间任务组未被捕获,如前所述。
您还可以看到,并非所有工作人员都同时完成任务。这是因为所有工作人员都通过共享的 inqueue
获取数据,并且一次只有一名工作人员可以从中读取数据。这同样适用于 outqueue
。一旦您传输 non-marginal 大小的数据,我们稍后会看到,这可能会导致更大的混乱。
此外,您还可以看到,尽管每个任务组都包含相同数量的工作,但任务组的实际测量时间跨度差异很大。分配给 worker-3 和 worker-4 的任务比前两个 worker 处理的任务需要更多的时间。对于这个 运行,我怀疑这是由于 turbo boost 在那一刻在核心上不再可用 worker-3/4,所以他们用较低的 clock-rate 处理他们的任务。
整个计算是如此之轻,以至于硬件或 OS-introduced chaos-factors 可以大大扭曲 PS。计算是 "leaf on the wind" 并且 DM 预测意义不大,即使对于理论上合适的场景也是如此。
第 2 运行:每个任务 30M 次迭代和 0 MiB 数据
将 for-loop 中的迭代次数从 30,000 次增加到 3000 万次,导致真正的并行计划与 提供的数据预测的接近完美匹配DM,欢呼!每个任务单元的计算现在已经足够繁重,可以边缘化开始和中间的空闲部分,只让 DM 预测的大空闲部分可见。
第 3 运行:每个任务 30M 次迭代和 50 MiB 数据
保持 30M 次迭代,但另外为每个任务组来回发送 50 MiB 再次扭曲了图片。这里 queueing-effect 很明显。与 Worker-1 相比,Worker-4 需要等待更长的时间才能完成第二个任务。现在想象这个有 70 名工人的时间表!
如果任务集在计算上非常轻,但可以提供大量数据作为有效负载,则单个共享 queue 的瓶颈可以阻止向池中添加更多工作人员的任何额外好处,即使它们由物理核心支持。在这种情况下,Worker-1 可以完成其第一个任务并等待新任务,甚至在 Worker-40 完成其第一个任务之前。
现在应该很明显了,为什么 Pool
中的计算时间并不总是随着工作人员的数量线性减少。沿着 发送相对大量的数据可能会 导致大部分时间花在等待数据被复制到一个工人的地址 space 而只有一个工人可以一次喂饱
第 4 运行:每个任务 6 亿次迭代和 50 MiB 数据
这里我们再次发送 50 MiB,但将迭代次数从 30M 增加到 600M,这使总计算时间从 10 秒增加到 152 秒。绘制的Parallel Schedule再次,与预测的接近完美匹配,通过数据复制的开销被边缘化。
9.结论
所讨论的乘以 4
增加了调度灵活性,但也利用了 taskel-distributions 中的不均匀性。如果没有这种乘法,空闲共享将被限制为单个工作人员,即使对于短迭代(对于具有密集场景的 DMo)。池的 chunksize-algorithm 需要 input-iterables 达到一定的大小才能重新获得该特征。
正如这个答案所希望显示的那样,与原始方法相比,Pool 的 chunksize-algorithm 平均而言可以带来更好的核心利用率,至少对于一般情况而言是这样,并且不考虑长开销。这里的朴素算法的分布效率 (DE) 可以低至 ~51%,而 Pool 的块大小算法则低至 ~81%。 DE 但是不像 IPC 那样包含并行化开销 (PO)。第 8 章已经表明,DE 对于边缘化开销的密集场景仍然具有强大的预测能力。
尽管 Pool 的 chunksize-algorithm 与朴素方法相比实现了更高的 DE,它并没有为每个输入提供最佳任务分配constellation. 虽然一个简单的静态 chunking-algorithm 不能优化 (overhead-including) 并行化效率 (PE),但没有内在原因为什么它不能 always 提供 100% 的相对分配效率 (RDE),这意味着与 chunksize=1
相同的 DE。一个简单的 chunksize-algorithm 仅包含基础数学,并且可以以任何方式自由 "slice the cake"。
与 Pool 实现的 "equal-size-chunking" 算法不同,"even-size-chunking" 算法将为每个 len_iterable
/ [ 提供 100% 的 RDE =39=]组合。 even-size-chunking 算法在 Pool 的源代码中实现起来会稍微复杂一些,但可以通过在外部打包任务来在现有算法之上进行调制(我将从这里 link 以防我丢掉一个Q/A 如何做到这一点)。