Python 3:Pool是否保持传递给map的数据的原始顺序?
Python 3: does Pool keep the original order of data passed to map?
我写了一个小脚本来在 4 个线程之间分配工作负载并测试结果是否保持有序(相对于输入的顺序):
from multiprocessing import Pool
import numpy as np
import time
import random
rows = 16
columns = 1000000
vals = np.arange(rows * columns, dtype=np.int32).reshape(rows, columns)
def worker(arr):
time.sleep(random.random()) # let the process sleep a random
for idx in np.ndindex(arr.shape): # amount of time to ensure that
arr[idx] += 1 # the processes finish at different
# time steps
return arr
# create the threadpool
with Pool(4) as p:
# schedule one map/worker for each row in the original data
q = p.map(worker, [row for row in vals])
for idx, row in enumerate(q):
print("[{:0>2}]: {: >8} - {: >8}".format(idx, row[0], row[-1]))
对我来说,这总是导致:
[00]: 1 - 1000000
[01]: 1000001 - 2000000
[02]: 2000001 - 3000000
[03]: 3000001 - 4000000
[04]: 4000001 - 5000000
[05]: 5000001 - 6000000
[06]: 6000001 - 7000000
[07]: 7000001 - 8000000
[08]: 8000001 - 9000000
[09]: 9000001 - 10000000
[10]: 10000001 - 11000000
[11]: 11000001 - 12000000
[12]: 12000001 - 13000000
[13]: 13000001 - 14000000
[14]: 14000001 - 15000000
[15]: 15000001 - 16000000
问题:那么,Pool
在q
中存储每个map
函数的结果时,是否真的保持了原始输入的顺序?
旁注:我问这个,因为我需要一种简单的方法来并行处理多个工作人员的工作。在某些情况下,顺序无关紧要。但是,在某些情况下,结果(如 q
)必须按原始顺序返回,因为我使用了一个依赖于有序数据的附加 reduce 函数。
性能:在我的机器上,这个操作比单个进程上的正常执行快大约 4 倍(正如预期的那样,因为我有 4 个内核)。此外,所有 4 个核心在运行时都处于 100% 使用率。
Pool.map
结果已排序。如果您需要订单,那很好;如果你不这样做,Pool.imap_unordered
可能是一个有用的优化。
请注意,虽然您从 Pool.map
接收结果的顺序是固定的,但它们的计算顺序是任意的。
文档将其记为 "parallel equivalent of the map()
built-in function"。由于 map
保证保持顺序,因此 multiprocessing.Pool.map
也保证了这一点。
请注意,虽然 结果 是有序的,但 执行 不一定是有序的。
map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
根据我的经验,它经常将列表成对地分块,以便项目 #1 和 #2 进入第一个 process/thread,#3 和 #4 进入第二个,依此类推。在此示例中,顺序为 [#1、#3、#2、#4]——但这可能会根据每个 process/thread 的数量和持续时间而有所不同(例如,如果 #1 是一个非常过程很长,#2 可能会延迟到 运行) 的最后一个过程。
显然,如果执行顺序对您很重要(就像对我们一样——下面会详细介绍),那么这是非常不可取的。
幸运的是,有一个相当简单的解决方案:只需将 chunksize
设置为 1!
pool.map(func, my_list, 1)
文档指出此参数指定了一个 近似 块大小,但根据我的经验,将其设置为 1 是可行的:它提供项目一个接一个地放到池子里,而不是一块一块地。
编辑:我们的用例可能不是很标准,让我提供一些细节:
- 我们必须并行处理大量快速和慢速作业(并行度取决于节点数和每个节点的核心数)。
- 我们使用多处理线程池来启动这些作业(在单独的进程中),并等待它们完成(使用
ThreadPool.map
),然后再做其他事情。
- 这些作业可能需要几分钟或几小时才能完成(这不仅仅是一些基本计算)。
- 这些工作流一直在发生(通常每天或每小时)。
- 执行顺序主要在计算时间效率方面很重要(在云中等于金钱)。我们希望最慢的作业首先完成 运行,而较快的作业在最后完成剩余的并行性。这就像装满一个手提箱——如果你从所有的小东西开始,你会过得很糟糕。
举个例子:假设我们有 20 个工作要在 4 threads/processes 完成 运行 -- 前两个工作每个需要大约 2 小时才能完成 运行,其他的需要几分钟。以下是两种替代方案:
使用分块(默认行为):
#1 & #2 将被分块到相同的 thread/process(因此 运行 顺序),而其他的将以类似的分块顺序执行。当#2 完成时,所有其他 threads/processes 将处于空闲状态。 总运行时间:~4 小时。
没有分块(设置chunksize = 1
):
#1 & #2 将 not 分块到相同的 thread/process,因此 运行在平行下。其他的将在 threads/processes 可用时按顺序执行。 总运行时间:~2 小时。
当您为云中的计算付费时,这会产生巨大的差异——尤其是每小时和每天 运行 加起来就是每月和每年的账单。
我写了一个小脚本来在 4 个线程之间分配工作负载并测试结果是否保持有序(相对于输入的顺序):
from multiprocessing import Pool
import numpy as np
import time
import random
rows = 16
columns = 1000000
vals = np.arange(rows * columns, dtype=np.int32).reshape(rows, columns)
def worker(arr):
time.sleep(random.random()) # let the process sleep a random
for idx in np.ndindex(arr.shape): # amount of time to ensure that
arr[idx] += 1 # the processes finish at different
# time steps
return arr
# create the threadpool
with Pool(4) as p:
# schedule one map/worker for each row in the original data
q = p.map(worker, [row for row in vals])
for idx, row in enumerate(q):
print("[{:0>2}]: {: >8} - {: >8}".format(idx, row[0], row[-1]))
对我来说,这总是导致:
[00]: 1 - 1000000
[01]: 1000001 - 2000000
[02]: 2000001 - 3000000
[03]: 3000001 - 4000000
[04]: 4000001 - 5000000
[05]: 5000001 - 6000000
[06]: 6000001 - 7000000
[07]: 7000001 - 8000000
[08]: 8000001 - 9000000
[09]: 9000001 - 10000000
[10]: 10000001 - 11000000
[11]: 11000001 - 12000000
[12]: 12000001 - 13000000
[13]: 13000001 - 14000000
[14]: 14000001 - 15000000
[15]: 15000001 - 16000000
问题:那么,Pool
在q
中存储每个map
函数的结果时,是否真的保持了原始输入的顺序?
旁注:我问这个,因为我需要一种简单的方法来并行处理多个工作人员的工作。在某些情况下,顺序无关紧要。但是,在某些情况下,结果(如 q
)必须按原始顺序返回,因为我使用了一个依赖于有序数据的附加 reduce 函数。
性能:在我的机器上,这个操作比单个进程上的正常执行快大约 4 倍(正如预期的那样,因为我有 4 个内核)。此外,所有 4 个核心在运行时都处于 100% 使用率。
Pool.map
结果已排序。如果您需要订单,那很好;如果你不这样做,Pool.imap_unordered
可能是一个有用的优化。
请注意,虽然您从 Pool.map
接收结果的顺序是固定的,但它们的计算顺序是任意的。
文档将其记为 "parallel equivalent of the map()
built-in function"。由于 map
保证保持顺序,因此 multiprocessing.Pool.map
也保证了这一点。
请注意,虽然 结果 是有序的,但 执行 不一定是有序的。
map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
根据我的经验,它经常将列表成对地分块,以便项目 #1 和 #2 进入第一个 process/thread,#3 和 #4 进入第二个,依此类推。在此示例中,顺序为 [#1、#3、#2、#4]——但这可能会根据每个 process/thread 的数量和持续时间而有所不同(例如,如果 #1 是一个非常过程很长,#2 可能会延迟到 运行) 的最后一个过程。
显然,如果执行顺序对您很重要(就像对我们一样——下面会详细介绍),那么这是非常不可取的。
幸运的是,有一个相当简单的解决方案:只需将 chunksize
设置为 1!
pool.map(func, my_list, 1)
文档指出此参数指定了一个 近似 块大小,但根据我的经验,将其设置为 1 是可行的:它提供项目一个接一个地放到池子里,而不是一块一块地。
编辑:我们的用例可能不是很标准,让我提供一些细节:
- 我们必须并行处理大量快速和慢速作业(并行度取决于节点数和每个节点的核心数)。
- 我们使用多处理线程池来启动这些作业(在单独的进程中),并等待它们完成(使用
ThreadPool.map
),然后再做其他事情。 - 这些作业可能需要几分钟或几小时才能完成(这不仅仅是一些基本计算)。
- 这些工作流一直在发生(通常每天或每小时)。
- 执行顺序主要在计算时间效率方面很重要(在云中等于金钱)。我们希望最慢的作业首先完成 运行,而较快的作业在最后完成剩余的并行性。这就像装满一个手提箱——如果你从所有的小东西开始,你会过得很糟糕。
举个例子:假设我们有 20 个工作要在 4 threads/processes 完成 运行 -- 前两个工作每个需要大约 2 小时才能完成 运行,其他的需要几分钟。以下是两种替代方案:
使用分块(默认行为):
#1 & #2 将被分块到相同的 thread/process(因此 运行 顺序),而其他的将以类似的分块顺序执行。当#2 完成时,所有其他 threads/processes 将处于空闲状态。 总运行时间:~4 小时。
没有分块(设置chunksize = 1
):
#1 & #2 将 not 分块到相同的 thread/process,因此 运行在平行下。其他的将在 threads/processes 可用时按顺序执行。 总运行时间:~2 小时。
当您为云中的计算付费时,这会产生巨大的差异——尤其是每小时和每天 运行 加起来就是每月和每年的账单。