为什么在 CPU 上进行线程浮点计算会使它们花费更长的时间?

Why does threading floating point computations on the CPU make them take significantly longer?

我目前正在从事科学模拟 (Gravitational nbody)。我首先用一个朴素的单线程算法编写它,这对少量粒子的表现是可以接受的。然后我对这个算法进行了多线程处理(它是令人尴尬的并行),程序花费了大约 3 倍的时间。下面是一个最小的、完整的、可验证的简单算法示例,它具有类似的属性并输出到 /tmp 中的文件(它被设计为 Linux 上的 运行,但 C++ 也是标准的)。请注意,如果您决定 运行 此代码,它将生成一个 152.62MB 的文件。输出数据是为了防止编译器优化程序外的计算。

#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>

constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;

void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
    unsigned threadCount = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    threads.reserve(threadCount);

    std::atomic<unsigned> currentIndex(0);

    for(unsigned i=0;i<threadCount;++i){
        threads.emplace_back([&currentIndex, batchSize, count, callback]{
            unsigned startAt = currentIndex.fetch_add(batchSize);

            if(startAt >= count){
                return;
            }else{
                for(unsigned i=0;i<count;++i){
                    unsigned index = startAt+i;
                    if(index >= count){
                        return;
                    }
                    callback(index);
                }
            }
        });
    }

    for(std::thread &thread : threads){
        thread.join();
    }
}

void threadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i) {
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        });
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

void unThreadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i){
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        for(unsigned x=0;x<NUMBER_COUNT;++x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        }
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

int main(int argc, char *argv[]) {
    if(argv[1][0] == 't'){
        threadedTest();
    }else{
        unThreadedTest();
    }
    return 0;
}

当我 运行 这个(在 Linux 上用 clang 7.0.1 编译)时,我从 Linux time 命令得到以下时间。这些之间的区别与我在真实程序中看到的相似。标记为 "real" 的条目与此问题相关,因为这是程序到达 运行.

所需的时钟时间

单线程:

real    6m27.261s
user    6m27.081s
sys     0m0.051s

多线程:

real    14m32.856s
user    216m58.063s
sys     0m4.492s

因此,当我预计它会显着加速时(大约是 8 倍,因为我有一个 8 核 16 线程 CPU),我问是什么导致了这种巨大的减速。我没有在 GPU 上实现它,因为下一步是对算法进行一些更改以将其从 O(n²) 变为 O(nlogn),但这对 GPU 也不友好。与包含的示例相比,更改后的算法与我当前实现的 O(n²) 算法的差异较小。最后,我想观察到 运行 每次迭代的主观时间(根据出现的迭代线之间的时间来判断)在线程化和非线程化 运行 中都有显着变化。

理解这段代码有点困难,但我认为你是在大规模地重复工作,因为每个线程都完成了几乎所有的工作,只是在开始时跳过了一小部分。

我假设 runThreaded 的内部循环应该是:

unsigned startAt = currentIndex.fetch_add(batchSize);

while (startAt < count) {
  if (startAt >= count) {
    return;
  } else {
    for(unsigned i=0;i<batchSize;++i){
      unsigned index = startAt+i;

      if(index >= count){
        return;
      }

      callback(index);
    }
  }

  startAt = currentIndex.fetch_add(batchSize);
}

这里的关键是i < batchSize。你应该只做批次指示的工作,而不是 count 次,这是整个列表减去初始偏移量。

通过此更新,代码运行 显着 更快。我不确定它是否完成了所有必需的工作,因为很难判断这是否真的发生了,输出非常少。

为了在多个 CPU 上轻松并行化,我建议使用 tbb::parallel_for. It uses the correct number of CPUs and splits the range for you, completely eliminating the risk of implementing it wrong. Alternatively, there is a parallel for_each in C++17。也就是说,这个问题有很多好的解法。

矢量化代码是一个难题,clang++-6g++-8 都不会自动矢量化基线代码。因此,下面的 SIMD 版本我使用了优秀的 Vc: portable, zero-overhead C++ types for explicitly data-parallel programming 库。

以下是比较的工作基准:

  • 基准版本。
  • SIMD 版本。
  • SIMD + 多线程版本。


#include <Vc/Vc>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>

constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;

double baseline() {
    double result = 0;

    std::vector<double> newNumbers(NUMBER_COUNT);
    std::vector<double> numbers(NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers)
        n = rnd();

    for(int i = 0; i < ITERATION_COUNT; ++i) {
        for(int x = 0; x < NUMBER_COUNT; ++x) {
            double total = 0;
            for(int y = 0; y < NUMBER_COUNT; ++y) {
                auto d = (y - x);
                total += numbers[y] * (d * d);
            }
            newNumbers[x] = total;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
        swap(numbers, newNumbers);
    }

    return result;
}

double simd() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const incv(Vc::double_v::Size);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        Vc::double_v x(Vc::IndexesFromZero);
        for(auto& new_n : newNumbers) {
            Vc::double_v totals;
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    auto d = y - x;
                    totals += n[j] * (d * d);
                    ++y;
                }
            }
            new_n = totals;
            x += incv;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

double simd_mt() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const v0123(Vc::IndexesFromZero);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        constexpr int SIMD_STEP = 4;
        tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
            Vc::double_v xs[SIMD_STEP];
            for(int is = 0; is < SIMD_STEP; ++is)
                xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
            Vc::double_v totals[SIMD_STEP];
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    for(int is = 0; is < SIMD_STEP; ++is) {
                        auto d = y - xs[is];
                        totals[is] += n[j] * (d * d);
                    }
                    ++y;
                }
            }
            std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
        });
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

struct Stopwatch {
    using Clock = std::chrono::high_resolution_clock;
    using Seconds = std::chrono::duration<double>;
    Clock::time_point start_ = Clock::now();

    Seconds elapsed() const {
        return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
    }
};


std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
    auto precision = s.precision(9);
    s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
    s.precision(precision);
    return s;
}

void benchmark() {
    Stopwatch::Seconds baseline_time;
    {
        Stopwatch s;
        double result = baseline();
        baseline_time = s.elapsed();
        std::cout << "baseline: " << result << ", " << baseline_time << '\n';
    }

    {
        Stopwatch s;
        double result = simd();
        auto time = s.elapsed();
        std::cout << "    simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }

    {
        Stopwatch s;
        double result = simd_mt();
        auto time = s.elapsed();
        std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }
}

int main() {
    benchmark();
    benchmark();
    benchmark();
}

时间安排:

baseline: 2.76582e+257, 6.399848397s
    simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
 simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup

备注:

  • 我的机器支持 AVX 但不支持 AVX-512,因此使用 SIMD 时大约有 4 倍的加速。
  • simd_mt 版本在我的机器上使用 8 个线程和更大的 SIMD 步骤。理论加速为 128 倍,实际为 38 倍。
  • clang++-6 不能自动矢量化基线代码,g++-8 也不能。
  • g++-8 为 SIMD 版本生成的代码比 clang++-6 快得多。

你的心肯定是在正确的地方减去一两个错误。

par_for 是一个复杂的问题,具体取决于循环的有效负载。有 没有一刀切的解决方案。有效载荷可以是任何东西 几个添加到几乎无限的互斥块 - 例如通过做内存 分配。

作为工作项模式的原子变量对我来说一直很有效,但是 请记住,原子变量在 X86 上的成本很高(~400 个周期),甚至 如果它们在未执行的分支中会产生高昂的成本,因为我发现这是我的危险。

以下的一些排列通常是好的。选择正确的 chunks_per_thread (如在您的 batchSize 中)是至关重要的。如果你不相信你的 用户,您可以测试执行循环的几次迭代来猜测 最佳分块级别。

#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>

template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
  using namespace std;
  using namespace chrono;

  atomic<int> work_item{start};
  vector<future<void>> futures(std::thread::hardware_concurrency());

  for (auto &fut : futures) {
    fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
      for(;;) {
        int wi = work_item.fetch_add(step * chunks_per_thread);
        if (wi > end) break;
        int wi_max = std::min(end, wi+step * chunks_per_thread);
        while (wi < wi_max) {
          func(wi);
          wi += step;
        }
      }
    });
  }

  for (auto &fut : futures) {
    fut.wait();
  }
}

int main() {
  using namespace std;
  using namespace chrono;
  for (int k = 0; k != 2; ++k) {
    auto t0 = high_resolution_clock::now();
    constexpr int loops = 100000000;
    if (k == 0) {
      for (int i = 0; i != loops; ++i ) {
        if (i % 10000000 == 0) printf("%d\n", i);
      }
    } else {
      par_for(0, loops, 1, 100000, [](int i) {
        if (i % 10000000 == 0) printf("%d\n", i);
      });
    }
    auto t1 = high_resolution_clock::now();
    duration<double, milli> ns = t1 - t0;
    printf("k=%d %fms total\n", k, ns.count());
  }
}

结果

...
k=0 174.925903ms total
...
k=1 27.924738ms total

大约 6 倍加速。

我避免使用 "embarassingly parallel" 这个词,因为它几乎从来都不是这样。在从 1 级缓存(ns 延迟)到全球跨集群(ms 延迟)的旅程中,您使用的资源越多,您付出的成本就会成倍增加。但我希望这个代码片段可以作为答案。