为什么在 CPU 上进行线程浮点计算会使它们花费更长的时间?
Why does threading floating point computations on the CPU make them take significantly longer?
我目前正在从事科学模拟 (Gravitational nbody)。我首先用一个朴素的单线程算法编写它,这对少量粒子的表现是可以接受的。然后我对这个算法进行了多线程处理(它是令人尴尬的并行),程序花费了大约 3 倍的时间。下面是一个最小的、完整的、可验证的简单算法示例,它具有类似的属性并输出到 /tmp 中的文件(它被设计为 Linux 上的 运行,但 C++ 也是标准的)。请注意,如果您决定 运行 此代码,它将生成一个 152.62MB 的文件。输出数据是为了防止编译器优化程序外的计算。
#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>
constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;
void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
unsigned threadCount = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
threads.reserve(threadCount);
std::atomic<unsigned> currentIndex(0);
for(unsigned i=0;i<threadCount;++i){
threads.emplace_back([¤tIndex, batchSize, count, callback]{
unsigned startAt = currentIndex.fetch_add(batchSize);
if(startAt >= count){
return;
}else{
for(unsigned i=0;i<count;++i){
unsigned index = startAt+i;
if(index >= count){
return;
}
callback(index);
}
}
});
}
for(std::thread &thread : threads){
thread.join();
}
}
void threadedTest(){
std::mt19937_64 rnd(0);
std::vector<double> numbers;
numbers.reserve(NUMBER_COUNT);
for(unsigned i=0;i<NUMBER_COUNT;++i){
numbers.push_back(rnd());
}
std::vector<double> newNumbers = numbers;
std::ofstream fout("/tmp/test-data.bin");
for(unsigned i=0;i<ITERATION_COUNT;++i) {
std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
double total = 0;
for(unsigned y=0;y<NUMBER_COUNT;++y){
total += numbers[y]*(y-x)*(y-x);
}
newNumbers[x] = total;
});
fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
std::swap(numbers, newNumbers);
}
}
void unThreadedTest(){
std::mt19937_64 rnd(0);
std::vector<double> numbers;
numbers.reserve(NUMBER_COUNT);
for(unsigned i=0;i<NUMBER_COUNT;++i){
numbers.push_back(rnd());
}
std::vector<double> newNumbers = numbers;
std::ofstream fout("/tmp/test-data.bin");
for(unsigned i=0;i<ITERATION_COUNT;++i){
std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
for(unsigned x=0;x<NUMBER_COUNT;++x){
double total = 0;
for(unsigned y=0;y<NUMBER_COUNT;++y){
total += numbers[y]*(y-x)*(y-x);
}
newNumbers[x] = total;
}
fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
std::swap(numbers, newNumbers);
}
}
int main(int argc, char *argv[]) {
if(argv[1][0] == 't'){
threadedTest();
}else{
unThreadedTest();
}
return 0;
}
当我 运行 这个(在 Linux 上用 clang 7.0.1 编译)时,我从 Linux time
命令得到以下时间。这些之间的区别与我在真实程序中看到的相似。标记为 "real" 的条目与此问题相关,因为这是程序到达 运行.
所需的时钟时间
单线程:
real 6m27.261s
user 6m27.081s
sys 0m0.051s
多线程:
real 14m32.856s
user 216m58.063s
sys 0m4.492s
因此,当我预计它会显着加速时(大约是 8 倍,因为我有一个 8 核 16 线程 CPU),我问是什么导致了这种巨大的减速。我没有在 GPU 上实现它,因为下一步是对算法进行一些更改以将其从 O(n²) 变为 O(nlogn),但这对 GPU 也不友好。与包含的示例相比,更改后的算法与我当前实现的 O(n²) 算法的差异较小。最后,我想观察到 运行 每次迭代的主观时间(根据出现的迭代线之间的时间来判断)在线程化和非线程化 运行 中都有显着变化。
理解这段代码有点困难,但我认为你是在大规模地重复工作,因为每个线程都完成了几乎所有的工作,只是在开始时跳过了一小部分。
我假设 runThreaded
的内部循环应该是:
unsigned startAt = currentIndex.fetch_add(batchSize);
while (startAt < count) {
if (startAt >= count) {
return;
} else {
for(unsigned i=0;i<batchSize;++i){
unsigned index = startAt+i;
if(index >= count){
return;
}
callback(index);
}
}
startAt = currentIndex.fetch_add(batchSize);
}
这里的关键是i < batchSize
。你应该只做批次指示的工作,而不是 count
次,这是整个列表减去初始偏移量。
通过此更新,代码运行 显着 更快。我不确定它是否完成了所有必需的工作,因为很难判断这是否真的发生了,输出非常少。
为了在多个 CPU 上轻松并行化,我建议使用 tbb::parallel_for
. It uses the correct number of CPUs and splits the range for you, completely eliminating the risk of implementing it wrong. Alternatively, there is a parallel for_each
in C++17。也就是说,这个问题有很多好的解法。
矢量化代码是一个难题,clang++-6
和 g++-8
都不会自动矢量化基线代码。因此,下面的 SIMD 版本我使用了优秀的 Vc: portable, zero-overhead C++ types for explicitly data-parallel programming 库。
以下是比较的工作基准:
- 基准版本。
- SIMD 版本。
- SIMD + 多线程版本。
#include <Vc/Vc>
#include <tbb/parallel_for.h>
#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>
constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;
double baseline() {
double result = 0;
std::vector<double> newNumbers(NUMBER_COUNT);
std::vector<double> numbers(NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers)
n = rnd();
for(int i = 0; i < ITERATION_COUNT; ++i) {
for(int x = 0; x < NUMBER_COUNT; ++x) {
double total = 0;
for(int y = 0; y < NUMBER_COUNT; ++y) {
auto d = (y - x);
total += numbers[y] * (d * d);
}
newNumbers[x] = total;
}
result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
swap(numbers, newNumbers);
}
return result;
}
double simd() {
double result = 0;
constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
vector_double_v newNumbers(SIMD_NUMBER_COUNT);
vector_double_v numbers(SIMD_NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers) {
alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
for(double& v : t)
v = rnd();
n.load(t, Vc::Aligned);
}
Vc::double_v const incv(Vc::double_v::Size);
for(int i = 0; i < ITERATION_COUNT; ++i) {
Vc::double_v x(Vc::IndexesFromZero);
for(auto& new_n : newNumbers) {
Vc::double_v totals;
int y = 0;
for(auto const& n : numbers) {
for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
auto d = y - x;
totals += n[j] * (d * d);
++y;
}
}
new_n = totals;
x += incv;
}
result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
swap(numbers, newNumbers);
}
return result;
}
double simd_mt() {
double result = 0;
constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
vector_double_v newNumbers(SIMD_NUMBER_COUNT);
vector_double_v numbers(SIMD_NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers) {
alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
for(double& v : t)
v = rnd();
n.load(t, Vc::Aligned);
}
Vc::double_v const v0123(Vc::IndexesFromZero);
for(int i = 0; i < ITERATION_COUNT; ++i) {
constexpr int SIMD_STEP = 4;
tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
Vc::double_v xs[SIMD_STEP];
for(int is = 0; is < SIMD_STEP; ++is)
xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
Vc::double_v totals[SIMD_STEP];
int y = 0;
for(auto const& n : numbers) {
for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
for(int is = 0; is < SIMD_STEP; ++is) {
auto d = y - xs[is];
totals[is] += n[j] * (d * d);
}
++y;
}
}
std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
});
result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
swap(numbers, newNumbers);
}
return result;
}
struct Stopwatch {
using Clock = std::chrono::high_resolution_clock;
using Seconds = std::chrono::duration<double>;
Clock::time_point start_ = Clock::now();
Seconds elapsed() const {
return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
}
};
std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
auto precision = s.precision(9);
s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
s.precision(precision);
return s;
}
void benchmark() {
Stopwatch::Seconds baseline_time;
{
Stopwatch s;
double result = baseline();
baseline_time = s.elapsed();
std::cout << "baseline: " << result << ", " << baseline_time << '\n';
}
{
Stopwatch s;
double result = simd();
auto time = s.elapsed();
std::cout << " simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
}
{
Stopwatch s;
double result = simd_mt();
auto time = s.elapsed();
std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
}
}
int main() {
benchmark();
benchmark();
benchmark();
}
时间安排:
baseline: 2.76582e+257, 6.399848397s
simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup
备注:
- 我的机器支持 AVX 但不支持 AVX-512,因此使用 SIMD 时大约有 4 倍的加速。
simd_mt
版本在我的机器上使用 8 个线程和更大的 SIMD 步骤。理论加速为 128 倍,实际为 38 倍。
clang++-6
不能自动矢量化基线代码,g++-8
也不能。
g++-8
为 SIMD 版本生成的代码比 clang++-6
快得多。
你的心肯定是在正确的地方减去一两个错误。
par_for 是一个复杂的问题,具体取决于循环的有效负载。有
没有一刀切的解决方案。有效载荷可以是任何东西
几个添加到几乎无限的互斥块 - 例如通过做内存
分配。
作为工作项模式的原子变量对我来说一直很有效,但是
请记住,原子变量在 X86 上的成本很高(~400 个周期),甚至
如果它们在未执行的分支中会产生高昂的成本,因为我发现这是我的危险。
以下的一些排列通常是好的。选择正确的 chunks_per_thread (如在您的 batchSize 中)是至关重要的。如果你不相信你的
用户,您可以测试执行循环的几次迭代来猜测
最佳分块级别。
#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>
template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
using namespace std;
using namespace chrono;
atomic<int> work_item{start};
vector<future<void>> futures(std::thread::hardware_concurrency());
for (auto &fut : futures) {
fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
for(;;) {
int wi = work_item.fetch_add(step * chunks_per_thread);
if (wi > end) break;
int wi_max = std::min(end, wi+step * chunks_per_thread);
while (wi < wi_max) {
func(wi);
wi += step;
}
}
});
}
for (auto &fut : futures) {
fut.wait();
}
}
int main() {
using namespace std;
using namespace chrono;
for (int k = 0; k != 2; ++k) {
auto t0 = high_resolution_clock::now();
constexpr int loops = 100000000;
if (k == 0) {
for (int i = 0; i != loops; ++i ) {
if (i % 10000000 == 0) printf("%d\n", i);
}
} else {
par_for(0, loops, 1, 100000, [](int i) {
if (i % 10000000 == 0) printf("%d\n", i);
});
}
auto t1 = high_resolution_clock::now();
duration<double, milli> ns = t1 - t0;
printf("k=%d %fms total\n", k, ns.count());
}
}
结果
...
k=0 174.925903ms total
...
k=1 27.924738ms total
大约 6 倍加速。
我避免使用 "embarassingly parallel" 这个词,因为它几乎从来都不是这样。在从 1 级缓存(ns 延迟)到全球跨集群(ms 延迟)的旅程中,您使用的资源越多,您付出的成本就会成倍增加。但我希望这个代码片段可以作为答案。
我目前正在从事科学模拟 (Gravitational nbody)。我首先用一个朴素的单线程算法编写它,这对少量粒子的表现是可以接受的。然后我对这个算法进行了多线程处理(它是令人尴尬的并行),程序花费了大约 3 倍的时间。下面是一个最小的、完整的、可验证的简单算法示例,它具有类似的属性并输出到 /tmp 中的文件(它被设计为 Linux 上的 运行,但 C++ 也是标准的)。请注意,如果您决定 运行 此代码,它将生成一个 152.62MB 的文件。输出数据是为了防止编译器优化程序外的计算。
#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>
constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;
void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
unsigned threadCount = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
threads.reserve(threadCount);
std::atomic<unsigned> currentIndex(0);
for(unsigned i=0;i<threadCount;++i){
threads.emplace_back([¤tIndex, batchSize, count, callback]{
unsigned startAt = currentIndex.fetch_add(batchSize);
if(startAt >= count){
return;
}else{
for(unsigned i=0;i<count;++i){
unsigned index = startAt+i;
if(index >= count){
return;
}
callback(index);
}
}
});
}
for(std::thread &thread : threads){
thread.join();
}
}
void threadedTest(){
std::mt19937_64 rnd(0);
std::vector<double> numbers;
numbers.reserve(NUMBER_COUNT);
for(unsigned i=0;i<NUMBER_COUNT;++i){
numbers.push_back(rnd());
}
std::vector<double> newNumbers = numbers;
std::ofstream fout("/tmp/test-data.bin");
for(unsigned i=0;i<ITERATION_COUNT;++i) {
std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
double total = 0;
for(unsigned y=0;y<NUMBER_COUNT;++y){
total += numbers[y]*(y-x)*(y-x);
}
newNumbers[x] = total;
});
fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
std::swap(numbers, newNumbers);
}
}
void unThreadedTest(){
std::mt19937_64 rnd(0);
std::vector<double> numbers;
numbers.reserve(NUMBER_COUNT);
for(unsigned i=0;i<NUMBER_COUNT;++i){
numbers.push_back(rnd());
}
std::vector<double> newNumbers = numbers;
std::ofstream fout("/tmp/test-data.bin");
for(unsigned i=0;i<ITERATION_COUNT;++i){
std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
for(unsigned x=0;x<NUMBER_COUNT;++x){
double total = 0;
for(unsigned y=0;y<NUMBER_COUNT;++y){
total += numbers[y]*(y-x)*(y-x);
}
newNumbers[x] = total;
}
fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
std::swap(numbers, newNumbers);
}
}
int main(int argc, char *argv[]) {
if(argv[1][0] == 't'){
threadedTest();
}else{
unThreadedTest();
}
return 0;
}
当我 运行 这个(在 Linux 上用 clang 7.0.1 编译)时,我从 Linux time
命令得到以下时间。这些之间的区别与我在真实程序中看到的相似。标记为 "real" 的条目与此问题相关,因为这是程序到达 运行.
单线程:
real 6m27.261s
user 6m27.081s
sys 0m0.051s
多线程:
real 14m32.856s
user 216m58.063s
sys 0m4.492s
因此,当我预计它会显着加速时(大约是 8 倍,因为我有一个 8 核 16 线程 CPU),我问是什么导致了这种巨大的减速。我没有在 GPU 上实现它,因为下一步是对算法进行一些更改以将其从 O(n²) 变为 O(nlogn),但这对 GPU 也不友好。与包含的示例相比,更改后的算法与我当前实现的 O(n²) 算法的差异较小。最后,我想观察到 运行 每次迭代的主观时间(根据出现的迭代线之间的时间来判断)在线程化和非线程化 运行 中都有显着变化。
理解这段代码有点困难,但我认为你是在大规模地重复工作,因为每个线程都完成了几乎所有的工作,只是在开始时跳过了一小部分。
我假设 runThreaded
的内部循环应该是:
unsigned startAt = currentIndex.fetch_add(batchSize);
while (startAt < count) {
if (startAt >= count) {
return;
} else {
for(unsigned i=0;i<batchSize;++i){
unsigned index = startAt+i;
if(index >= count){
return;
}
callback(index);
}
}
startAt = currentIndex.fetch_add(batchSize);
}
这里的关键是i < batchSize
。你应该只做批次指示的工作,而不是 count
次,这是整个列表减去初始偏移量。
通过此更新,代码运行 显着 更快。我不确定它是否完成了所有必需的工作,因为很难判断这是否真的发生了,输出非常少。
为了在多个 CPU 上轻松并行化,我建议使用 tbb::parallel_for
. It uses the correct number of CPUs and splits the range for you, completely eliminating the risk of implementing it wrong. Alternatively, there is a parallel for_each
in C++17。也就是说,这个问题有很多好的解法。
矢量化代码是一个难题,clang++-6
和 g++-8
都不会自动矢量化基线代码。因此,下面的 SIMD 版本我使用了优秀的 Vc: portable, zero-overhead C++ types for explicitly data-parallel programming 库。
以下是比较的工作基准:
- 基准版本。
- SIMD 版本。
- SIMD + 多线程版本。
#include <Vc/Vc>
#include <tbb/parallel_for.h>
#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>
constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;
double baseline() {
double result = 0;
std::vector<double> newNumbers(NUMBER_COUNT);
std::vector<double> numbers(NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers)
n = rnd();
for(int i = 0; i < ITERATION_COUNT; ++i) {
for(int x = 0; x < NUMBER_COUNT; ++x) {
double total = 0;
for(int y = 0; y < NUMBER_COUNT; ++y) {
auto d = (y - x);
total += numbers[y] * (d * d);
}
newNumbers[x] = total;
}
result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
swap(numbers, newNumbers);
}
return result;
}
double simd() {
double result = 0;
constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
vector_double_v newNumbers(SIMD_NUMBER_COUNT);
vector_double_v numbers(SIMD_NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers) {
alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
for(double& v : t)
v = rnd();
n.load(t, Vc::Aligned);
}
Vc::double_v const incv(Vc::double_v::Size);
for(int i = 0; i < ITERATION_COUNT; ++i) {
Vc::double_v x(Vc::IndexesFromZero);
for(auto& new_n : newNumbers) {
Vc::double_v totals;
int y = 0;
for(auto const& n : numbers) {
for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
auto d = y - x;
totals += n[j] * (d * d);
++y;
}
}
new_n = totals;
x += incv;
}
result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
swap(numbers, newNumbers);
}
return result;
}
double simd_mt() {
double result = 0;
constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
vector_double_v newNumbers(SIMD_NUMBER_COUNT);
vector_double_v numbers(SIMD_NUMBER_COUNT);
std::mt19937 rnd(0);
for(auto& n : numbers) {
alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
for(double& v : t)
v = rnd();
n.load(t, Vc::Aligned);
}
Vc::double_v const v0123(Vc::IndexesFromZero);
for(int i = 0; i < ITERATION_COUNT; ++i) {
constexpr int SIMD_STEP = 4;
tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
Vc::double_v xs[SIMD_STEP];
for(int is = 0; is < SIMD_STEP; ++is)
xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
Vc::double_v totals[SIMD_STEP];
int y = 0;
for(auto const& n : numbers) {
for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
for(int is = 0; is < SIMD_STEP; ++is) {
auto d = y - xs[is];
totals[is] += n[j] * (d * d);
}
++y;
}
}
std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
});
result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
swap(numbers, newNumbers);
}
return result;
}
struct Stopwatch {
using Clock = std::chrono::high_resolution_clock;
using Seconds = std::chrono::duration<double>;
Clock::time_point start_ = Clock::now();
Seconds elapsed() const {
return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
}
};
std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
auto precision = s.precision(9);
s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
s.precision(precision);
return s;
}
void benchmark() {
Stopwatch::Seconds baseline_time;
{
Stopwatch s;
double result = baseline();
baseline_time = s.elapsed();
std::cout << "baseline: " << result << ", " << baseline_time << '\n';
}
{
Stopwatch s;
double result = simd();
auto time = s.elapsed();
std::cout << " simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
}
{
Stopwatch s;
double result = simd_mt();
auto time = s.elapsed();
std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
}
}
int main() {
benchmark();
benchmark();
benchmark();
}
时间安排:
baseline: 2.76582e+257, 6.399848397s
simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup
备注:
- 我的机器支持 AVX 但不支持 AVX-512,因此使用 SIMD 时大约有 4 倍的加速。
simd_mt
版本在我的机器上使用 8 个线程和更大的 SIMD 步骤。理论加速为 128 倍,实际为 38 倍。clang++-6
不能自动矢量化基线代码,g++-8
也不能。g++-8
为 SIMD 版本生成的代码比clang++-6
快得多。
你的心肯定是在正确的地方减去一两个错误。
par_for 是一个复杂的问题,具体取决于循环的有效负载。有 没有一刀切的解决方案。有效载荷可以是任何东西 几个添加到几乎无限的互斥块 - 例如通过做内存 分配。
作为工作项模式的原子变量对我来说一直很有效,但是 请记住,原子变量在 X86 上的成本很高(~400 个周期),甚至 如果它们在未执行的分支中会产生高昂的成本,因为我发现这是我的危险。
以下的一些排列通常是好的。选择正确的 chunks_per_thread (如在您的 batchSize 中)是至关重要的。如果你不相信你的 用户,您可以测试执行循环的几次迭代来猜测 最佳分块级别。
#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>
template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
using namespace std;
using namespace chrono;
atomic<int> work_item{start};
vector<future<void>> futures(std::thread::hardware_concurrency());
for (auto &fut : futures) {
fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
for(;;) {
int wi = work_item.fetch_add(step * chunks_per_thread);
if (wi > end) break;
int wi_max = std::min(end, wi+step * chunks_per_thread);
while (wi < wi_max) {
func(wi);
wi += step;
}
}
});
}
for (auto &fut : futures) {
fut.wait();
}
}
int main() {
using namespace std;
using namespace chrono;
for (int k = 0; k != 2; ++k) {
auto t0 = high_resolution_clock::now();
constexpr int loops = 100000000;
if (k == 0) {
for (int i = 0; i != loops; ++i ) {
if (i % 10000000 == 0) printf("%d\n", i);
}
} else {
par_for(0, loops, 1, 100000, [](int i) {
if (i % 10000000 == 0) printf("%d\n", i);
});
}
auto t1 = high_resolution_clock::now();
duration<double, milli> ns = t1 - t0;
printf("k=%d %fms total\n", k, ns.count());
}
}
结果
...
k=0 174.925903ms total
...
k=1 27.924738ms total
大约 6 倍加速。
我避免使用 "embarassingly parallel" 这个词,因为它几乎从来都不是这样。在从 1 级缓存(ns 延迟)到全球跨集群(ms 延迟)的旅程中,您使用的资源越多,您付出的成本就会成倍增加。但我希望这个代码片段可以作为答案。