线程队列 C++

Thread Queue C++

'''The original post has been edited'''

如何在 C++ 中为两个 for 循环创建一个线程池?对于 0 到 6 之间的每个数字,我需要 运行 start_thread 函数 22 次。根据我使用的机器,我将有灵活数量的可用线程。如何创建池以将空闲线程分配给嵌套循环的下一个线程?

for (int t=0; t <22; t++){
    for(int p=0; p<6; p++){
        thread th1(start_thread, p);
        thread th2(start_thread, p);
        th1.join();
        th2.join();
     }
}

不太确定你想要什么,但也许是这样的。

for (int t=0; t <22; t++){
        std::vector<std::thread> th;
        for(int p=0; p<6; p++){
                th.emplace_back(std::thread(start_thread, p));
        }
        for(int p=0; p<6; p++){
                th[i].join();
        }
}

(或者置换两个循环)


编辑如果要控制线程数

#include <iostream>
#include <thread>
#include <vector>

void
start_thread(int t, int p)
{
  std::cout << "th " << t << ' ' << p << '\n';
}

void
join_all(std::vector<std::thread> &th)
{
  for(auto &e: th)
  {
    e.join();
  }
  th.clear();
}

int
main()
{
  std::size_t max_threads=std::thread::hardware_concurrency();
  std::vector<std::thread> th;
  for(int t=0; t <22; ++t)
  {
    for(int p=0; p<6; ++p)
    {
      th.emplace_back(std::thread(start_thread, t, p));
      if(size(th)==max_threads)
      {
        join_all(th);
      }
    }
  } 
  join_all(th);
  return 0;
}

停止尝试编码并画出您需要做的事情以及完成它所需的部分。

您需要一个队列来保存作业,一个互斥锁来保护队列,这样线程就不会因同时访问而弄乱它,还需要 N 个线程。

每个线程函数都是一个循环

  1. 获取互斥量,
  2. 从队列中获取作业,
  3. 释放互斥量,
  4. 处理作业。

在这种情况下,当步骤 2 中队列中没有更多作业时,我会通过退出循环和线程来保持简单。在生产中,您将拥有线程块并在队列中等待,所以它是仍可用于稍后添加的服务工作。

将其包装在 class 中,其中包含一个允许您将作业添加到队列的函数、一个启动 N 个线程的函数以及一个加入所有 运行 线程的函数.

main 定义 class 的一个实例,提供作业,启动线程池,然后在加入时阻塞,直到每个人都完成。

一旦你将设计打造成你有信心做你需要它做的事情,然后你就可以开始编写代码了。在没有计划的情况下编写代码,尤其是 multi-threaded 代码,您将面临大量调试和 re-writing,这通常会大大超出设计所花费的时间。

如果您不想依赖 third-party 库,这非常简单。

只需创建一些你喜欢的线程,让它们从某个队列中选择一个“工作”。

例如:

#include <iostream>
#include <mutex>
#include <chrono>
#include <vector>
#include <thread>
#include <queue>

void work(int p)
{
  // do the "work"
  std::this_thread::sleep_for(std::chrono::milliseconds(200));
  std::cout << p << std::endl;
}

std::mutex m;
std::queue<int> jobs;
void worker()
{
  while (true)
  {
    int job(0);
    // sync access to the jobs queue
    {
      std::lock_guard<std::mutex> l(m);
      if (jobs.empty())
        return;
      job = jobs.front();
      jobs.pop();
    }
    work(job);
  }
}

int main()
{
  // queue all jobs
  for (int t = 0; t < 22; t++) {
    for (int p = 0; p < 6; p++) {
      jobs.push(p);
    }
  }

  // create reasonable number of threads
  static const int n = std::thread::hardware_concurrency();
  std::vector<std::thread> threads;
  for (int i = 0; i < n; ++i)
    threads.emplace_back(std::thread(worker));
  // wait for all of them to finish
  for (int i = 0; i < n; ++i)
    threads[i].join();
}

[已添加] 显然,您不希望在生产代码中使用全局变量;这只是一个演示解决方案。

从 C++17 开始,您可以对标准库中的许多算法使用 execution policies 之一。这可以大大简化检查多个工作包的过程。在幕后发生的事情通常是它从 built-in 线程池中挑选线程并有效地向它们分配工作。它通常在 Linux 和 Windows 中使用 just enough™ 线程,它会使用你剩下的所有 CPU (0%当 CPU:s 开始以最大频率旋转时所有核心都空闲)-奇怪的是没有使 Linux 和 Windows “迟钝”。

这里我使用了执行策略 std::execution::parallel_policy(由 std::execution::par 常量表示)。如果你能把需要做的工作准备好,放在一个容器里,比如std::vector,那就真的很简单了。

#include <algorithm>
#include <chrono>
#include <execution>  // std::execution::par
#include <iostream>
// #include <thread>  // not needed to run with execuion policies
#include <vector>

struct work_package {
    work_package() : payload(co) { ++co; }
    int payload;
    static int co;
};
int work_package::co = 10;

int main() {
    std::vector<work_package> wps(22*6);                         // 132 work packages
    for(const auto& wp : wps) std::cout << wp.payload << '\n';   // prints 10 to 141

    // work on the work packages
    std::for_each(std::execution::par, wps.begin(), wps.end(), [](auto& wp) {
        // Probably in a thread - As long as you do not write to the same work package
        // from different threads, you don't need synchronization here.

        // do some work with the work package
        ++wp.payload;
    });

    for(const auto& wp : wps) std::cout << wp.payload << '\n';   // prints 11 to 142
}

使用 g++,您可能需要安装 tbb (The Threading Building Blocks),您还需要 link 安装:-ltbb.

  • apt install libtbb-dev Ubuntu。
  • dnf install tbb-devel.x86_64 在 Fedora 上。

其他发行版可能会有不同的称呼。

Visual Studio(2017 年及以后)link 自动使用适当的库(如果我现在弄错了,还有 tbb)。