调度并行前缀算法
Scheduling the parallel prefix algorithm
我想实现并行前缀算法(使用 POSIX 线程或 OpenMP)。我有并行算法的伪代码,但不知道如何实现。
parallelPrefix(array)
if array.length = 1
result[1] = array[1]
return result[]
for 1 <= i <= floor(n / 2) pardo
temp[i] = array[2 * i - 1] + array[2 * i]
prefixSums[] = parallelPrefix(temp[])
for 1 <= i <= n pardo
if i is even result[i] = prefixSums[i / 2]
if i = 1 result[i] = array[1]
if i is odd result[i] = prefixSums[(i - 1) / 2] + temp[i]
return result[]
(这个算法基本上是在讲义中给我们的,我不是自己想出来的,实际上我想使用一个类似的算法,它的符号稍微不那么优雅,但可以就地工作)
我的问题是我不确定如何安排它。 'pardo' 符号意味着循环是并行完成的,但我不确定如何将其映射到有限数量的处理器。当然,我可以为循环的每个实例创建一个线程,但这似乎有点矫枉过正(我想启动一个新线程的额外开销会比并行性带来的收益大得多)。
另一个稍微好一点的选项是在处理器之间分配并行循环中的工作。这样我们可以使用更少的线程,但仍然必须为每个并行 for 循环启动一堆线程。
这两个选项似乎都不太理想,所以我一定遗漏了一些东西。我不知道首选的解决方案是什么,我很想听听一些建议。
编辑:我意识到使用同步原语是另一种选择。但这会变得有点复杂(我想),我仍然需要某种策略来将工作映射到处理器。
算法的并行化潜力
您提出的算法很难并行化:大量新的 for
循环实例在每次迭代中几乎没有什么可做的。我不得不使用一个巨大的数组来实现任何加速,如此之大以至于我几乎 运行 内存不足(进程占用约 13 GB)。但是您提到您想要实施一种不同的就地算法。这将显着减少内存占用,您将能够针对大到足以看到显着加速的问题执行算法。
错误
我认为,你的算法有一些错误。首先, n
从未被定义。我假设,它指的是 array.length
,以使算法有意义。其次,行
if i is odd result[i] = prefixSums[(i - 1) / 2] + temp[i]
应该是
if i is odd result[i] = prefixSums[(i - 1) / 2] + array[i]
OpenMP
线程创建
在 OpenMP 中,您不必自己处理线程的创建和调度,因此您的并行循环
for 1 <= i <= n pardo
可以简单写成
int i;
#pragma omp parallel for shared(i)
for (i = 1; i <= n; i++) {
/* loop body */
}
C 或
#pragma omp parallel for
for (int i = 1; i <= n; i++) {
// loop body
}
在 C++ 中或
!$omp parallel do private(i)
do i=1,n
! loop body
end do
!$omp end parallel do
in Fort运行(您尚未指定要使用的编程语言)。 T运行s形成另一个并行循环以同样的方式工作。
这将创建 OpenMP 认为足够多的线程,通常是您的操作系统看到的处理器数量(包括完整的处理器、多核处理器中的内核和虚拟超线程内核)。但是,可以通过不同的方式覆盖它。
选择一个时间表
迭代将由 OpenMP 分配给线程。默认情况下,这是使用静态计划发生的,该计划将完整循环 运行ge 分成每个线程的块。如果您的迭代在计算工作中非常不平衡,并且您的迭代太少而无法平衡,您可以使用 OpenMP 指令中的 schedule(dynamic)
子句请求动态计划。但我认为你的情况不需要这样做。
开销
所以您不会像您提到的那样在每次迭代中使用一个线程,而是在特定系统上使用尽可能多的线程。然而,即使在那种情况下,创建和销毁线程也可能会有一些开销,并且这些开销可能会耗尽您从并行化中获得的收益,尤其是对于 short/cheap 循环。
在您的示例中,小型 n
就是这种情况。假设 n
你的意思是 array.length
(否则你的算法就没有意义),当你在递归中接近基本情况时可能就是这种情况。
我不会对此太担心,因为在您的算法中,这种情况并不多。在其他类型的数组递归算法中,例如分而治之算法,对原始数组 (see the last line in this diagram) 的每个元素执行基本情况。与那些算法不同,您的递归是线性的,因此您将很少执行这些关键执行。
此外,一些 OpenMP 实现决定让一堆线程保持活动状态,以防万一,并在需要时使用它们。在这种情况下,您完全不必担心开销。
如果您决定开销很重要,而您的 OpenMP 实现本身不能使线程保持活动状态,您有 2 个选择:
停用小线程n
您可以通过使用 num_threads
指令在迭代次数很少的情况下覆盖线程数,从而为那些简单的执行停用并行执行,例如
procs = n > 100000 ? omp_get_num_procs() : 1;
#pragma omp parallel for num_threads(procs)
for (i = 1; i <= n; i++) {
/* loop body */
}
请注意,您需要包含 omp.h
才能访问 omp_get_num_procs()
。完整的 C++ 示例:
std::vector<int> parallelPrefix_withNumThreads(const std::vector<int> &array) {
int n = array.size();
int procs = n > 100 ? omp_get_num_procs() : 1;
std::vector<int> result(n);
if (n == 1) {
result[0] = array[0];
return result;
}
std::vector<int> temp(n / 2);
#pragma omp parallel for shared(temp, array) num_threads(procs)
for (int i = 0; i < n / 2; i++) {
temp[i] = array[2 * i] + array[2 * i + 1];
}
std::vector<int> prefixSums = parallelPrefix_withNumThreads(temp);
#pragma omp parallel for shared(temp, array) num_threads(procs)
for (int i = 0; i < n; i++) {
if (i % 2 == 1) {
result[i] = prefixSums[i / 2];
} else if (i == 0) {
result[i] = array[0];
} else if (i % 2 == 0) {
result[i] = prefixSums[i / 2 - 1] + array[i];
}
}
return result;
}
通过将简单情况下的线程数设置为 1,您可以告诉 OpenMP 在这些情况下不要创建任何额外的线程。所有工作都将在(已经存在的)主线程中执行。这消除了这些情况下的几乎所有开销。
您必须设置哪个限制以及这是否会提高性能,在很大程度上取决于您的机器和 OpenMP 实现。无论如何,我用来测试性能的 GCC 4.9.2 重用了它的 OpenMP 线程。在此设置中,我无法通过此技巧观察到任何改进。但这对于其他 OpenMP 实现可能有所不同。
保持线程活动
如果您的初始 n
非常小并且您仍想尝试从并行化中获得一些加速,您可以强制 OpenMP 让您的线程在并行部分之间保持活动状态。这可以通过一个包含所有 omp for
循环执行的 omp parallel
部分来完成。
但是,这一切是否值得付出努力,在很大程度上取决于您使用的 OpenMP 实现。同样,我用来测试性能的 GCC 4.9.2 重用了它的 OpenMP 线程。在此设置中,拉出 omp parallel
指令不是必需的,因此根本没有帮助,它甚至稍微降低了性能。但这对于其他 OpenMP 实现可能有所不同。
C++ 示例代码:
#include <vector>
std::vector<int> parallelPrefix_outsideParallel(const std::vector<int> &array) {
int n = array.size();
std::vector<int> result(n);
if (n == 1) {
result[0] = array[0];
return result;
}
std::vector<int> temp(n / 2);
std::vector<int> prefixSums;
#pragma omp parallel shared(array, result, prefixSums) firstprivate(n)
{
#pragma omp for
for (int i = 0; i < n / 2; i++) {
temp[i] = array[2 * i] + array[2 * i + 1];
}
#pragma omp single
{
prefixSums = parallelPrefix_outsideParallel(temp);
}
#pragma omp for
for (int i = 0; i < n; i++) {
if (i % 2 == 1) {
result[i] = prefixSums[i / 2];
} else if (i == 0) {
result[i] = array[0];
} else if (i % 2 == 0) {
result[i] = prefixSums[i / 2 - 1] + array[i];
}
}
} // end parallel
return result;
}
外部 omp parallel
指令现在包含 for
循环和递归调用。因此,所有 omp for
指令,无论是在当前函数实例中还是在该函数的所有递归实例中,都将使用由该单个 omp parallel
指令创建的线程。
确保OMP_NESTED
is set to FALSE
,否则每次递归调用都会跨越一组新的线程,使您的系统乱七八糟。
omp single
指令确保池中只有一个线程进行递归调用。
请注意,在 omp for
块和 omp single
块的末尾都有隐式屏障同步。没有它们,实现将有竞争条件。
POSIX
如果您想使用 POSIX 个线程,主要的调度思想仍然适用。但是,在这里管理线程的生命周期是您自己的责任。您必须创建多个线程,然后为它们分配工作。
如果可以选择,请不要去那里,而是使用 OpenMP。特别是为不同的 for
循环实例重用线程,就像这个特定问题所需要的(参见 OpenMP 部分),实现起来可能非常具有挑战性。
静态时间表
使用静态调度,每个线程的循环运行ge的计算可以是这样的(假设numThreads
是你创建的线程数,threadId
是一个从 0 到 numThreads-1
的数字,用于标识每个线程):
int getStartIndex(int threadId, int numThreads, int n) {
return 1 + threadId * n / numThreads + min(threadId, n % numThreads);
}
int startIndex = getStartIndex(threadId, numThreads, n);
int nextThreadStartIndex = getStartIndex(threadId + 1, numThreads, n);
for (int i = startIndex; i < nextThreadStartIndex; i++) {
// loop body
}
这将执行从索引 1 到 n
的整个循环,为每个线程分成块。分配将尽可能均匀,每个线程 n / numThreads
次迭代。如果不可能使其完全均匀,第一个 n % numThreads
个线程将比其余线程多接收 1 次迭代(这就是 min(threadId, n % numThreads)
表达式的用途)。
动态时间表
可以通过维护由中央实例管理的迭代队列(例如,实现为列表)来实现动态调度。每个线程都有一个 while
循环,要求新的迭代(或更好的效率:小块迭代)并执行它们。当中央队列为空时,除主线程外的所有线程都会终止,并且主线程会等待所有其他可能仍在执行迭代的线程(屏障同步),然后再继续执行代码的顺序部分。
中央队列的管理需要一些线程同步代码,以避免竞争条件和可能的队列损坏。
但是,鉴于您的迭代在计算成本方面相当统一,我会完全避免动态计划的工作。
我想实现并行前缀算法(使用 POSIX 线程或 OpenMP)。我有并行算法的伪代码,但不知道如何实现。
parallelPrefix(array)
if array.length = 1
result[1] = array[1]
return result[]
for 1 <= i <= floor(n / 2) pardo
temp[i] = array[2 * i - 1] + array[2 * i]
prefixSums[] = parallelPrefix(temp[])
for 1 <= i <= n pardo
if i is even result[i] = prefixSums[i / 2]
if i = 1 result[i] = array[1]
if i is odd result[i] = prefixSums[(i - 1) / 2] + temp[i]
return result[]
(这个算法基本上是在讲义中给我们的,我不是自己想出来的,实际上我想使用一个类似的算法,它的符号稍微不那么优雅,但可以就地工作)
我的问题是我不确定如何安排它。 'pardo' 符号意味着循环是并行完成的,但我不确定如何将其映射到有限数量的处理器。当然,我可以为循环的每个实例创建一个线程,但这似乎有点矫枉过正(我想启动一个新线程的额外开销会比并行性带来的收益大得多)。
另一个稍微好一点的选项是在处理器之间分配并行循环中的工作。这样我们可以使用更少的线程,但仍然必须为每个并行 for 循环启动一堆线程。
这两个选项似乎都不太理想,所以我一定遗漏了一些东西。我不知道首选的解决方案是什么,我很想听听一些建议。
编辑:我意识到使用同步原语是另一种选择。但这会变得有点复杂(我想),我仍然需要某种策略来将工作映射到处理器。
算法的并行化潜力
您提出的算法很难并行化:大量新的 for
循环实例在每次迭代中几乎没有什么可做的。我不得不使用一个巨大的数组来实现任何加速,如此之大以至于我几乎 运行 内存不足(进程占用约 13 GB)。但是您提到您想要实施一种不同的就地算法。这将显着减少内存占用,您将能够针对大到足以看到显着加速的问题执行算法。
错误
我认为,你的算法有一些错误。首先, n
从未被定义。我假设,它指的是 array.length
,以使算法有意义。其次,行
if i is odd result[i] = prefixSums[(i - 1) / 2] + temp[i]
应该是
if i is odd result[i] = prefixSums[(i - 1) / 2] + array[i]
OpenMP
线程创建
在 OpenMP 中,您不必自己处理线程的创建和调度,因此您的并行循环
for 1 <= i <= n pardo
可以简单写成
int i;
#pragma omp parallel for shared(i)
for (i = 1; i <= n; i++) {
/* loop body */
}
C 或
#pragma omp parallel for
for (int i = 1; i <= n; i++) {
// loop body
}
在 C++ 中或
!$omp parallel do private(i)
do i=1,n
! loop body
end do
!$omp end parallel do
in Fort运行(您尚未指定要使用的编程语言)。 T运行s形成另一个并行循环以同样的方式工作。
这将创建 OpenMP 认为足够多的线程,通常是您的操作系统看到的处理器数量(包括完整的处理器、多核处理器中的内核和虚拟超线程内核)。但是,可以通过不同的方式覆盖它。
选择一个时间表
迭代将由 OpenMP 分配给线程。默认情况下,这是使用静态计划发生的,该计划将完整循环 运行ge 分成每个线程的块。如果您的迭代在计算工作中非常不平衡,并且您的迭代太少而无法平衡,您可以使用 OpenMP 指令中的 schedule(dynamic)
子句请求动态计划。但我认为你的情况不需要这样做。
开销
所以您不会像您提到的那样在每次迭代中使用一个线程,而是在特定系统上使用尽可能多的线程。然而,即使在那种情况下,创建和销毁线程也可能会有一些开销,并且这些开销可能会耗尽您从并行化中获得的收益,尤其是对于 short/cheap 循环。
在您的示例中,小型 n
就是这种情况。假设 n
你的意思是 array.length
(否则你的算法就没有意义),当你在递归中接近基本情况时可能就是这种情况。
我不会对此太担心,因为在您的算法中,这种情况并不多。在其他类型的数组递归算法中,例如分而治之算法,对原始数组 (see the last line in this diagram) 的每个元素执行基本情况。与那些算法不同,您的递归是线性的,因此您将很少执行这些关键执行。
此外,一些 OpenMP 实现决定让一堆线程保持活动状态,以防万一,并在需要时使用它们。在这种情况下,您完全不必担心开销。
如果您决定开销很重要,而您的 OpenMP 实现本身不能使线程保持活动状态,您有 2 个选择:
停用小线程n
您可以通过使用 num_threads
指令在迭代次数很少的情况下覆盖线程数,从而为那些简单的执行停用并行执行,例如
procs = n > 100000 ? omp_get_num_procs() : 1;
#pragma omp parallel for num_threads(procs)
for (i = 1; i <= n; i++) {
/* loop body */
}
请注意,您需要包含 omp.h
才能访问 omp_get_num_procs()
。完整的 C++ 示例:
std::vector<int> parallelPrefix_withNumThreads(const std::vector<int> &array) {
int n = array.size();
int procs = n > 100 ? omp_get_num_procs() : 1;
std::vector<int> result(n);
if (n == 1) {
result[0] = array[0];
return result;
}
std::vector<int> temp(n / 2);
#pragma omp parallel for shared(temp, array) num_threads(procs)
for (int i = 0; i < n / 2; i++) {
temp[i] = array[2 * i] + array[2 * i + 1];
}
std::vector<int> prefixSums = parallelPrefix_withNumThreads(temp);
#pragma omp parallel for shared(temp, array) num_threads(procs)
for (int i = 0; i < n; i++) {
if (i % 2 == 1) {
result[i] = prefixSums[i / 2];
} else if (i == 0) {
result[i] = array[0];
} else if (i % 2 == 0) {
result[i] = prefixSums[i / 2 - 1] + array[i];
}
}
return result;
}
通过将简单情况下的线程数设置为 1,您可以告诉 OpenMP 在这些情况下不要创建任何额外的线程。所有工作都将在(已经存在的)主线程中执行。这消除了这些情况下的几乎所有开销。
您必须设置哪个限制以及这是否会提高性能,在很大程度上取决于您的机器和 OpenMP 实现。无论如何,我用来测试性能的 GCC 4.9.2 重用了它的 OpenMP 线程。在此设置中,我无法通过此技巧观察到任何改进。但这对于其他 OpenMP 实现可能有所不同。
保持线程活动
如果您的初始 n
非常小并且您仍想尝试从并行化中获得一些加速,您可以强制 OpenMP 让您的线程在并行部分之间保持活动状态。这可以通过一个包含所有 omp for
循环执行的 omp parallel
部分来完成。
但是,这一切是否值得付出努力,在很大程度上取决于您使用的 OpenMP 实现。同样,我用来测试性能的 GCC 4.9.2 重用了它的 OpenMP 线程。在此设置中,拉出 omp parallel
指令不是必需的,因此根本没有帮助,它甚至稍微降低了性能。但这对于其他 OpenMP 实现可能有所不同。
C++ 示例代码:
#include <vector>
std::vector<int> parallelPrefix_outsideParallel(const std::vector<int> &array) {
int n = array.size();
std::vector<int> result(n);
if (n == 1) {
result[0] = array[0];
return result;
}
std::vector<int> temp(n / 2);
std::vector<int> prefixSums;
#pragma omp parallel shared(array, result, prefixSums) firstprivate(n)
{
#pragma omp for
for (int i = 0; i < n / 2; i++) {
temp[i] = array[2 * i] + array[2 * i + 1];
}
#pragma omp single
{
prefixSums = parallelPrefix_outsideParallel(temp);
}
#pragma omp for
for (int i = 0; i < n; i++) {
if (i % 2 == 1) {
result[i] = prefixSums[i / 2];
} else if (i == 0) {
result[i] = array[0];
} else if (i % 2 == 0) {
result[i] = prefixSums[i / 2 - 1] + array[i];
}
}
} // end parallel
return result;
}
外部 omp parallel
指令现在包含 for
循环和递归调用。因此,所有 omp for
指令,无论是在当前函数实例中还是在该函数的所有递归实例中,都将使用由该单个 omp parallel
指令创建的线程。
确保OMP_NESTED
is set to FALSE
,否则每次递归调用都会跨越一组新的线程,使您的系统乱七八糟。
omp single
指令确保池中只有一个线程进行递归调用。
请注意,在 omp for
块和 omp single
块的末尾都有隐式屏障同步。没有它们,实现将有竞争条件。
POSIX
如果您想使用 POSIX 个线程,主要的调度思想仍然适用。但是,在这里管理线程的生命周期是您自己的责任。您必须创建多个线程,然后为它们分配工作。
如果可以选择,请不要去那里,而是使用 OpenMP。特别是为不同的 for
循环实例重用线程,就像这个特定问题所需要的(参见 OpenMP 部分),实现起来可能非常具有挑战性。
静态时间表
使用静态调度,每个线程的循环运行ge的计算可以是这样的(假设numThreads
是你创建的线程数,threadId
是一个从 0 到 numThreads-1
的数字,用于标识每个线程):
int getStartIndex(int threadId, int numThreads, int n) {
return 1 + threadId * n / numThreads + min(threadId, n % numThreads);
}
int startIndex = getStartIndex(threadId, numThreads, n);
int nextThreadStartIndex = getStartIndex(threadId + 1, numThreads, n);
for (int i = startIndex; i < nextThreadStartIndex; i++) {
// loop body
}
这将执行从索引 1 到 n
的整个循环,为每个线程分成块。分配将尽可能均匀,每个线程 n / numThreads
次迭代。如果不可能使其完全均匀,第一个 n % numThreads
个线程将比其余线程多接收 1 次迭代(这就是 min(threadId, n % numThreads)
表达式的用途)。
动态时间表
可以通过维护由中央实例管理的迭代队列(例如,实现为列表)来实现动态调度。每个线程都有一个 while
循环,要求新的迭代(或更好的效率:小块迭代)并执行它们。当中央队列为空时,除主线程外的所有线程都会终止,并且主线程会等待所有其他可能仍在执行迭代的线程(屏障同步),然后再继续执行代码的顺序部分。
中央队列的管理需要一些线程同步代码,以避免竞争条件和可能的队列损坏。
但是,鉴于您的迭代在计算成本方面相当统一,我会完全避免动态计划的工作。