对数时间并行减少
Reductions in parallel in logarithmic time
给定 n
个部分和,可以在 log2 并行步骤中对所有部分和求和。例如,假设有八个线程和八个部分和:s0, s1, s2, s3, s4, s5, s6, s7
。这可以通过这样的 log2(8) = 3
个连续步骤来减少;
thread0 thread1 thread2 thread4
s0 += s1 s2 += s3 s4 += s5 s6 +=s7
s0 += s2 s4 += s6
s0 += s4
我想使用 OpenMP 执行此操作,但我不想使用 OpenMP 的 reduction
子句。我想出了一个解决方案,但我认为可以使用 OpenMP 的 task
子句找到更好的解决方案。
这比标量加法更通用。让我选择一个更有用的案例:数组缩减(有关数组缩减的更多信息,请参阅 here, here, and )。
假设我想对数组 a
进行数组缩减。这是一些代码,它为每个线程并行填充私有数组。
int bins = 20;
int a[bins];
int **at; // array of pointers to arrays
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
#pragma omp single
at = (int**)malloc(sizeof *at * omp_get_num_threads());
at[omp_get_thread_num()] = (int*)malloc(sizeof **at * bins);
int a_private[bins];
//arbitrary function to fill the arrays for each thread
for(int i = 0; i<bins; i++) at[omp_get_thread_num()][i] = i + omp_get_thread_num();
}
此时我有一个指向每个线程数组的指针数组。现在我想将所有这些数组加在一起并将最终总和写入 a
。这是我想出的解决方案。
#pragma omp parallel
{
int n = omp_get_num_threads();
for(int m=1; n>1; m*=2) {
int c = n%2;
n/=2;
#pragma omp for
for(int i = 0; i<n; i++) {
int *p1 = at[2*i*m], *p2 = at[2*i*m+m];
for(int j = 0; j<bins; j++) p1[j] += p2[j];
}
n+=c;
}
#pragma omp single
memcpy(a, at[0], sizeof *a*bins);
free(at[omp_get_thread_num()]);
#pragma omp single
free(at);
}
让我试着解释这段代码的作用。假设有八个线程。让我们定义 +=
运算符来表示对数组求和。例如s0 += s1
是
for(int i=0; i<bins; i++) s0[i] += s1[i]
那么这段代码就可以了
n thread0 thread1 thread2 thread4
4 s0 += s1 s2 += s3 s4 += s5 s6 +=s7
2 s0 += s2 s4 += s6
1 s0 += s4
但是这段代码并不像我希望的那样理想。
一个问题是有一些隐式障碍需要所有线程同步。这些障碍应该不是必需的。第一个障碍是在填充阵列和进行归约之间。第二个障碍是在#pragma omp for
声明中的缩减。但是我不能将 nowait
子句与此方法一起使用来移除障碍。
还有一个问题是有几个线程不需要使用。例如八个线程。减少的第一步只需要四个线程,第二步两个线程,最后一步只需要一个线程。但是,此方法将涉及所有八个线程的减少。虽然,其他线程无论如何都不会做太多事情,应该直接进入障碍并等待,所以这可能不是什么大问题。
我的直觉是使用 omp task
子句可以找到更好的方法。不幸的是,我对 task
子句没有什么经验,到目前为止我所做的所有努力都比我现在失败的要好。
有人可以提出一个更好的解决方案来减少对数时间吗? OpenMP 的 task
子句?
我找到了解决障碍问题的方法。这会异步减少。唯一剩下的问题是它仍然将不参与缩减的线程置于忙循环中。此方法使用类似堆栈的东西在关键部分(这是 critical sections don't have implicit barriers 的关键之一)将指针推送到堆栈(但从不弹出它们)。堆栈是串行操作的,但减少是并行的。
这是一个工作示例。
#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
#include <string.h>
void foo6() {
int nthreads = 13;
omp_set_num_threads(nthreads);
int bins= 21;
int a[bins];
int **at;
int m = 0;
int nsums = 0;
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
int n = omp_get_num_threads();
int ithread = omp_get_thread_num();
#pragma omp single
at = (int**)malloc(sizeof *at * n * 2);
int* a_private = (int*)malloc(sizeof *a_private * bins);
//arbitrary fill function
for(int i = 0; i<bins; i++) a_private[i] = i + omp_get_thread_num();
#pragma omp critical (stack_section)
at[nsums++] = a_private;
while(nsums<2*n-2) {
int *p1, *p2;
char pop = 0;
#pragma omp critical (stack_section)
if((nsums-m)>1) p1 = at[m], p2 = at[m+1], m +=2, pop = 1;
if(pop) {
for(int i = 0; i<bins; i++) p1[i] += p2[i];
#pragma omp critical (stack_section)
at[nsums++] = p1;
}
}
#pragma omp barrier
#pragma omp single
memcpy(a, at[2*n-2], sizeof **at *bins);
free(a_private);
#pragma omp single
free(at);
}
for(int i = 0; i<bins; i++) printf("%d ", a[i]); puts("");
for(int i = 0; i<bins; i++) printf("%d ", (nthreads-1)*nthreads/2 +nthreads*i); puts("");
}
int main(void) {
foo6();
}
我仍然觉得可以找到更好的方法,即使用不会将未使用的线程置于繁忙循环中的任务。
实际上,使用递归 divide-and-conquer 方法通过任务干净地实现这一点非常简单。这几乎是 textbook 代码。
void operation(int* p1, int* p2, size_t bins)
{
for (int i = 0; i < bins; i++)
p1[i] += p2[i];
}
void reduce(int** arrs, size_t bins, int begin, int end)
{
assert(begin < end);
if (end - begin == 1) {
return;
}
int pivot = (begin + end) / 2;
/* Moving the termination condition here will avoid very short tasks,
* but make the code less nice. */
#pragma omp task
reduce(arrs, bins, begin, pivot);
#pragma omp task
reduce(arrs, bins, pivot, end);
#pragma omp taskwait
/* now begin and pivot contain the partial sums. */
operation(arrs[begin], arrs[pivot], bins);
}
/* call this within a parallel region */
#pragma omp single
reduce(at, bins, 0, n);
据我所知,没有不必要的同步,也没有对关键部分进行奇怪的轮询。它也可以自然地处理与您的等级数不同的数据大小。我发现它非常干净且易于理解。所以我确实认为这 比你们的两个解决方案更好。
但让我们看看它在实践中的表现*。为此,我们可以使用 Score-p and Vampir:
*bins=10000
所以减少实际上需要一点时间。在 24 核 Haswell 系统上执行 w/o turbo。 gcc 4.8.4,-O3
。我在实际执行周围添加了一些缓冲区以隐藏 initialization/post-processing
该图揭示了应用程序中任何线程在水平 time-axis 上发生的情况。从上到下的树实现:
omp for
循环
omp critical
一种任务。
omp task
这很好地展示了具体的实现是如何实际执行的。现在看来 for 循环实际上是最快的,尽管有不必要的同步。但是这个性能分析仍然存在一些缺陷。例如,我没有固定线程。在实践中,NUMA(non-uniform 内存访问)很重要:核心是否在自己的缓存/内存中拥有这些数据?这是任务解决方案变为 non-deterministic 的地方。在简单比较中不考虑重复之间非常显着的差异。
如果缩减操作在运行时变得可变,那么任务解决方案将变得比你的同步 for 循环更好。
critical
解决方案有一些有趣的方面,被动线程不会持续等待,因此它们更有可能消耗 CPU 资源。这可能对性能不利,例如在涡轮模式的情况下。
请记住,task
解决方案通过避免立即 return 生成任务而具有更多优化潜力。这些解决方案的执行方式也高度依赖于特定的 OpenMP 运行时。英特尔的运行时似乎对任务做得更差。
我的建议是:
- 使用最佳算法实施最可维护的解决方案
复杂度
- 衡量代码的哪些部分对 run-time
真正重要
- 根据实际测量分析瓶颈是什么。根据我的经验,更多的是关于 NUMA 和调度,而不是一些不必要的障碍。
- 根据您的实际测量micro-optimization执行
线性解
这是 this question 的线性 proccess_data_v1
的时间表。
OpenMP 4 缩减
所以我想到了减少 OpenMP。棘手的部分似乎是在没有副本的情况下从循环内的 at
数组获取数据。我确实用 NULL
初始化了 worker 数组,并在第一次简单地移动了指针:
void meta_op(int** pp1, int* p2, size_t bins)
{
if (*pp1 == NULL) {
*pp1 = p2;
return;
}
operation(*pp1, p2, bins);
}
// ...
// declare before parallel region as global
int* awork = NULL;
#pragma omp declare reduction(merge : int* : meta_op(&omp_out, omp_in, 100000)) initializer (omp_priv=NULL)
#pragma omp for reduction(merge : awork)
for (int t = 0; t < n; t++) {
meta_op(&awork, at[t], bins);
}
令人惊讶的是,这看起来不太好:
顶部是icc 16.0.2
,底部是gcc 5.3.0
,两者都是-O3
。
两者似乎都实现了序列化的减少。我试图调查 gcc
/ libgomp
,但我并不能立即看出发生了什么。从中间代码/反汇编来看,他们似乎将最终合并包装在 GOMP_atomic_start
/end
中——这似乎是一个全局互斥体。同样,icc
将对 operation
的调用包装在 kmpc_critical
中。我想在昂贵的自定义减少操作中没有太多优化。传统的归约可以通过 hardware-supported 原子操作来完成。
注意每个 operation
是如何更快的,因为输入是在本地缓存的,但由于序列化,它总体上更慢。同样,由于差异很大,这不是一个完美的比较,而且之前的屏幕截图使用的是不同的 gcc
版本。不过趋势很明显,缓存效果我也有数据。
给定 n
个部分和,可以在 log2 并行步骤中对所有部分和求和。例如,假设有八个线程和八个部分和:s0, s1, s2, s3, s4, s5, s6, s7
。这可以通过这样的 log2(8) = 3
个连续步骤来减少;
thread0 thread1 thread2 thread4
s0 += s1 s2 += s3 s4 += s5 s6 +=s7
s0 += s2 s4 += s6
s0 += s4
我想使用 OpenMP 执行此操作,但我不想使用 OpenMP 的 reduction
子句。我想出了一个解决方案,但我认为可以使用 OpenMP 的 task
子句找到更好的解决方案。
这比标量加法更通用。让我选择一个更有用的案例:数组缩减(有关数组缩减的更多信息,请参阅 here, here, and
假设我想对数组 a
进行数组缩减。这是一些代码,它为每个线程并行填充私有数组。
int bins = 20;
int a[bins];
int **at; // array of pointers to arrays
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
#pragma omp single
at = (int**)malloc(sizeof *at * omp_get_num_threads());
at[omp_get_thread_num()] = (int*)malloc(sizeof **at * bins);
int a_private[bins];
//arbitrary function to fill the arrays for each thread
for(int i = 0; i<bins; i++) at[omp_get_thread_num()][i] = i + omp_get_thread_num();
}
此时我有一个指向每个线程数组的指针数组。现在我想将所有这些数组加在一起并将最终总和写入 a
。这是我想出的解决方案。
#pragma omp parallel
{
int n = omp_get_num_threads();
for(int m=1; n>1; m*=2) {
int c = n%2;
n/=2;
#pragma omp for
for(int i = 0; i<n; i++) {
int *p1 = at[2*i*m], *p2 = at[2*i*m+m];
for(int j = 0; j<bins; j++) p1[j] += p2[j];
}
n+=c;
}
#pragma omp single
memcpy(a, at[0], sizeof *a*bins);
free(at[omp_get_thread_num()]);
#pragma omp single
free(at);
}
让我试着解释这段代码的作用。假设有八个线程。让我们定义 +=
运算符来表示对数组求和。例如s0 += s1
是
for(int i=0; i<bins; i++) s0[i] += s1[i]
那么这段代码就可以了
n thread0 thread1 thread2 thread4
4 s0 += s1 s2 += s3 s4 += s5 s6 +=s7
2 s0 += s2 s4 += s6
1 s0 += s4
但是这段代码并不像我希望的那样理想。
一个问题是有一些隐式障碍需要所有线程同步。这些障碍应该不是必需的。第一个障碍是在填充阵列和进行归约之间。第二个障碍是在#pragma omp for
声明中的缩减。但是我不能将 nowait
子句与此方法一起使用来移除障碍。
还有一个问题是有几个线程不需要使用。例如八个线程。减少的第一步只需要四个线程,第二步两个线程,最后一步只需要一个线程。但是,此方法将涉及所有八个线程的减少。虽然,其他线程无论如何都不会做太多事情,应该直接进入障碍并等待,所以这可能不是什么大问题。
我的直觉是使用 omp task
子句可以找到更好的方法。不幸的是,我对 task
子句没有什么经验,到目前为止我所做的所有努力都比我现在失败的要好。
有人可以提出一个更好的解决方案来减少对数时间吗? OpenMP 的 task
子句?
我找到了解决障碍问题的方法。这会异步减少。唯一剩下的问题是它仍然将不参与缩减的线程置于忙循环中。此方法使用类似堆栈的东西在关键部分(这是 critical sections don't have implicit barriers 的关键之一)将指针推送到堆栈(但从不弹出它们)。堆栈是串行操作的,但减少是并行的。
这是一个工作示例。
#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
#include <string.h>
void foo6() {
int nthreads = 13;
omp_set_num_threads(nthreads);
int bins= 21;
int a[bins];
int **at;
int m = 0;
int nsums = 0;
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
int n = omp_get_num_threads();
int ithread = omp_get_thread_num();
#pragma omp single
at = (int**)malloc(sizeof *at * n * 2);
int* a_private = (int*)malloc(sizeof *a_private * bins);
//arbitrary fill function
for(int i = 0; i<bins; i++) a_private[i] = i + omp_get_thread_num();
#pragma omp critical (stack_section)
at[nsums++] = a_private;
while(nsums<2*n-2) {
int *p1, *p2;
char pop = 0;
#pragma omp critical (stack_section)
if((nsums-m)>1) p1 = at[m], p2 = at[m+1], m +=2, pop = 1;
if(pop) {
for(int i = 0; i<bins; i++) p1[i] += p2[i];
#pragma omp critical (stack_section)
at[nsums++] = p1;
}
}
#pragma omp barrier
#pragma omp single
memcpy(a, at[2*n-2], sizeof **at *bins);
free(a_private);
#pragma omp single
free(at);
}
for(int i = 0; i<bins; i++) printf("%d ", a[i]); puts("");
for(int i = 0; i<bins; i++) printf("%d ", (nthreads-1)*nthreads/2 +nthreads*i); puts("");
}
int main(void) {
foo6();
}
我仍然觉得可以找到更好的方法,即使用不会将未使用的线程置于繁忙循环中的任务。
实际上,使用递归 divide-and-conquer 方法通过任务干净地实现这一点非常简单。这几乎是 textbook 代码。
void operation(int* p1, int* p2, size_t bins)
{
for (int i = 0; i < bins; i++)
p1[i] += p2[i];
}
void reduce(int** arrs, size_t bins, int begin, int end)
{
assert(begin < end);
if (end - begin == 1) {
return;
}
int pivot = (begin + end) / 2;
/* Moving the termination condition here will avoid very short tasks,
* but make the code less nice. */
#pragma omp task
reduce(arrs, bins, begin, pivot);
#pragma omp task
reduce(arrs, bins, pivot, end);
#pragma omp taskwait
/* now begin and pivot contain the partial sums. */
operation(arrs[begin], arrs[pivot], bins);
}
/* call this within a parallel region */
#pragma omp single
reduce(at, bins, 0, n);
据我所知,没有不必要的同步,也没有对关键部分进行奇怪的轮询。它也可以自然地处理与您的等级数不同的数据大小。我发现它非常干净且易于理解。所以我确实认为这 比你们的两个解决方案更好。
但让我们看看它在实践中的表现*。为此,我们可以使用 Score-p and Vampir:
*bins=10000
所以减少实际上需要一点时间。在 24 核 Haswell 系统上执行 w/o turbo。 gcc 4.8.4,-O3
。我在实际执行周围添加了一些缓冲区以隐藏 initialization/post-processing
该图揭示了应用程序中任何线程在水平 time-axis 上发生的情况。从上到下的树实现:
omp for
循环omp critical
一种任务。omp task
这很好地展示了具体的实现是如何实际执行的。现在看来 for 循环实际上是最快的,尽管有不必要的同步。但是这个性能分析仍然存在一些缺陷。例如,我没有固定线程。在实践中,NUMA(non-uniform 内存访问)很重要:核心是否在自己的缓存/内存中拥有这些数据?这是任务解决方案变为 non-deterministic 的地方。在简单比较中不考虑重复之间非常显着的差异。
如果缩减操作在运行时变得可变,那么任务解决方案将变得比你的同步 for 循环更好。
critical
解决方案有一些有趣的方面,被动线程不会持续等待,因此它们更有可能消耗 CPU 资源。这可能对性能不利,例如在涡轮模式的情况下。
请记住,task
解决方案通过避免立即 return 生成任务而具有更多优化潜力。这些解决方案的执行方式也高度依赖于特定的 OpenMP 运行时。英特尔的运行时似乎对任务做得更差。
我的建议是:
- 使用最佳算法实施最可维护的解决方案 复杂度
- 衡量代码的哪些部分对 run-time 真正重要
- 根据实际测量分析瓶颈是什么。根据我的经验,更多的是关于 NUMA 和调度,而不是一些不必要的障碍。
- 根据您的实际测量micro-optimization执行
线性解
这是 this question 的线性 proccess_data_v1
的时间表。
OpenMP 4 缩减
所以我想到了减少 OpenMP。棘手的部分似乎是在没有副本的情况下从循环内的 at
数组获取数据。我确实用 NULL
初始化了 worker 数组,并在第一次简单地移动了指针:
void meta_op(int** pp1, int* p2, size_t bins)
{
if (*pp1 == NULL) {
*pp1 = p2;
return;
}
operation(*pp1, p2, bins);
}
// ...
// declare before parallel region as global
int* awork = NULL;
#pragma omp declare reduction(merge : int* : meta_op(&omp_out, omp_in, 100000)) initializer (omp_priv=NULL)
#pragma omp for reduction(merge : awork)
for (int t = 0; t < n; t++) {
meta_op(&awork, at[t], bins);
}
令人惊讶的是,这看起来不太好:
顶部是icc 16.0.2
,底部是gcc 5.3.0
,两者都是-O3
。
两者似乎都实现了序列化的减少。我试图调查 gcc
/ libgomp
,但我并不能立即看出发生了什么。从中间代码/反汇编来看,他们似乎将最终合并包装在 GOMP_atomic_start
/end
中——这似乎是一个全局互斥体。同样,icc
将对 operation
的调用包装在 kmpc_critical
中。我想在昂贵的自定义减少操作中没有太多优化。传统的归约可以通过 hardware-supported 原子操作来完成。
注意每个 operation
是如何更快的,因为输入是在本地缓存的,但由于序列化,它总体上更慢。同样,由于差异很大,这不是一个完美的比较,而且之前的屏幕截图使用的是不同的 gcc
版本。不过趋势很明显,缓存效果我也有数据。