线程队列 C++
Thread Queue C++
'''The original post has been edited'''
如何在 C++ 中为两个 for 循环创建一个线程池?对于 0 到 6 之间的每个数字,我需要 运行 start_thread 函数 22 次。根据我使用的机器,我将有灵活数量的可用线程。如何创建池以将空闲线程分配给嵌套循环的下一个线程?
for (int t=0; t <22; t++){
for(int p=0; p<6; p++){
thread th1(start_thread, p);
thread th2(start_thread, p);
th1.join();
th2.join();
}
}
不太确定你想要什么,但也许是这样的。
for (int t=0; t <22; t++){
std::vector<std::thread> th;
for(int p=0; p<6; p++){
th.emplace_back(std::thread(start_thread, p));
}
for(int p=0; p<6; p++){
th[i].join();
}
}
(或者置换两个循环)
编辑如果要控制线程数
#include <iostream>
#include <thread>
#include <vector>
void
start_thread(int t, int p)
{
std::cout << "th " << t << ' ' << p << '\n';
}
void
join_all(std::vector<std::thread> &th)
{
for(auto &e: th)
{
e.join();
}
th.clear();
}
int
main()
{
std::size_t max_threads=std::thread::hardware_concurrency();
std::vector<std::thread> th;
for(int t=0; t <22; ++t)
{
for(int p=0; p<6; ++p)
{
th.emplace_back(std::thread(start_thread, t, p));
if(size(th)==max_threads)
{
join_all(th);
}
}
}
join_all(th);
return 0;
}
停止尝试编码并画出您需要做的事情以及完成它所需的部分。
您需要一个队列来保存作业,一个互斥锁来保护队列,这样线程就不会因同时访问而弄乱它,还需要 N 个线程。
每个线程函数都是一个循环
- 获取互斥量,
- 从队列中获取作业,
- 释放互斥量,
- 处理作业。
在这种情况下,当步骤 2 中队列中没有更多作业时,我会通过退出循环和线程来保持简单。在生产中,您将拥有线程块并在队列中等待,所以它是仍可用于稍后添加的服务工作。
将其包装在 class 中,其中包含一个允许您将作业添加到队列的函数、一个启动 N 个线程的函数以及一个加入所有 运行 线程的函数.
main
定义 class 的一个实例,提供作业,启动线程池,然后在加入时阻塞,直到每个人都完成。
一旦你将设计打造成你有信心做你需要它做的事情,然后你就可以开始编写代码了。在没有计划的情况下编写代码,尤其是 multi-threaded 代码,您将面临大量调试和 re-writing,这通常会大大超出设计所花费的时间。
如果您不想依赖 third-party 库,这非常简单。
只需创建一些你喜欢的线程,让它们从某个队列中选择一个“工作”。
例如:
#include <iostream>
#include <mutex>
#include <chrono>
#include <vector>
#include <thread>
#include <queue>
void work(int p)
{
// do the "work"
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << p << std::endl;
}
std::mutex m;
std::queue<int> jobs;
void worker()
{
while (true)
{
int job(0);
// sync access to the jobs queue
{
std::lock_guard<std::mutex> l(m);
if (jobs.empty())
return;
job = jobs.front();
jobs.pop();
}
work(job);
}
}
int main()
{
// queue all jobs
for (int t = 0; t < 22; t++) {
for (int p = 0; p < 6; p++) {
jobs.push(p);
}
}
// create reasonable number of threads
static const int n = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < n; ++i)
threads.emplace_back(std::thread(worker));
// wait for all of them to finish
for (int i = 0; i < n; ++i)
threads[i].join();
}
[已添加] 显然,您不希望在生产代码中使用全局变量;这只是一个演示解决方案。
从 C++17 开始,您可以对标准库中的许多算法使用 execution policies 之一。这可以大大简化检查多个工作包的过程。在幕后发生的事情通常是它从 built-in 线程池中挑选线程并有效地向它们分配工作。它通常在 Linux 和 Windows 中使用 just enough™ 线程,它会使用你剩下的所有 CPU (0%当 CPU:s 开始以最大频率旋转时所有核心都空闲)-奇怪的是没有使 Linux 和 Windows “迟钝”。
这里我使用了执行策略 std::execution::parallel_policy
(由 std::execution::par
常量表示)。如果你能把需要做的工作准备好,放在一个容器里,比如std::vector
,那就真的很简单了。
#include <algorithm>
#include <chrono>
#include <execution> // std::execution::par
#include <iostream>
// #include <thread> // not needed to run with execuion policies
#include <vector>
struct work_package {
work_package() : payload(co) { ++co; }
int payload;
static int co;
};
int work_package::co = 10;
int main() {
std::vector<work_package> wps(22*6); // 132 work packages
for(const auto& wp : wps) std::cout << wp.payload << '\n'; // prints 10 to 141
// work on the work packages
std::for_each(std::execution::par, wps.begin(), wps.end(), [](auto& wp) {
// Probably in a thread - As long as you do not write to the same work package
// from different threads, you don't need synchronization here.
// do some work with the work package
++wp.payload;
});
for(const auto& wp : wps) std::cout << wp.payload << '\n'; // prints 11 to 142
}
使用 g++
,您可能需要安装 tbb
(The Threading Building Blocks),您还需要 link 安装:-ltbb
.
apt install libtbb-dev
Ubuntu。
dnf install tbb-devel.x86_64
在 Fedora 上。
其他发行版可能会有不同的称呼。
Visual Studio(2017 年及以后)link 自动使用适当的库(如果我现在弄错了,还有 tbb
)。
'''The original post has been edited'''
如何在 C++ 中为两个 for 循环创建一个线程池?对于 0 到 6 之间的每个数字,我需要 运行 start_thread 函数 22 次。根据我使用的机器,我将有灵活数量的可用线程。如何创建池以将空闲线程分配给嵌套循环的下一个线程?
for (int t=0; t <22; t++){
for(int p=0; p<6; p++){
thread th1(start_thread, p);
thread th2(start_thread, p);
th1.join();
th2.join();
}
}
不太确定你想要什么,但也许是这样的。
for (int t=0; t <22; t++){
std::vector<std::thread> th;
for(int p=0; p<6; p++){
th.emplace_back(std::thread(start_thread, p));
}
for(int p=0; p<6; p++){
th[i].join();
}
}
(或者置换两个循环)
编辑如果要控制线程数
#include <iostream>
#include <thread>
#include <vector>
void
start_thread(int t, int p)
{
std::cout << "th " << t << ' ' << p << '\n';
}
void
join_all(std::vector<std::thread> &th)
{
for(auto &e: th)
{
e.join();
}
th.clear();
}
int
main()
{
std::size_t max_threads=std::thread::hardware_concurrency();
std::vector<std::thread> th;
for(int t=0; t <22; ++t)
{
for(int p=0; p<6; ++p)
{
th.emplace_back(std::thread(start_thread, t, p));
if(size(th)==max_threads)
{
join_all(th);
}
}
}
join_all(th);
return 0;
}
停止尝试编码并画出您需要做的事情以及完成它所需的部分。
您需要一个队列来保存作业,一个互斥锁来保护队列,这样线程就不会因同时访问而弄乱它,还需要 N 个线程。
每个线程函数都是一个循环
- 获取互斥量,
- 从队列中获取作业,
- 释放互斥量,
- 处理作业。
在这种情况下,当步骤 2 中队列中没有更多作业时,我会通过退出循环和线程来保持简单。在生产中,您将拥有线程块并在队列中等待,所以它是仍可用于稍后添加的服务工作。
将其包装在 class 中,其中包含一个允许您将作业添加到队列的函数、一个启动 N 个线程的函数以及一个加入所有 运行 线程的函数.
main
定义 class 的一个实例,提供作业,启动线程池,然后在加入时阻塞,直到每个人都完成。
一旦你将设计打造成你有信心做你需要它做的事情,然后你就可以开始编写代码了。在没有计划的情况下编写代码,尤其是 multi-threaded 代码,您将面临大量调试和 re-writing,这通常会大大超出设计所花费的时间。
如果您不想依赖 third-party 库,这非常简单。
只需创建一些你喜欢的线程,让它们从某个队列中选择一个“工作”。
例如:
#include <iostream>
#include <mutex>
#include <chrono>
#include <vector>
#include <thread>
#include <queue>
void work(int p)
{
// do the "work"
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << p << std::endl;
}
std::mutex m;
std::queue<int> jobs;
void worker()
{
while (true)
{
int job(0);
// sync access to the jobs queue
{
std::lock_guard<std::mutex> l(m);
if (jobs.empty())
return;
job = jobs.front();
jobs.pop();
}
work(job);
}
}
int main()
{
// queue all jobs
for (int t = 0; t < 22; t++) {
for (int p = 0; p < 6; p++) {
jobs.push(p);
}
}
// create reasonable number of threads
static const int n = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < n; ++i)
threads.emplace_back(std::thread(worker));
// wait for all of them to finish
for (int i = 0; i < n; ++i)
threads[i].join();
}
[已添加] 显然,您不希望在生产代码中使用全局变量;这只是一个演示解决方案。
从 C++17 开始,您可以对标准库中的许多算法使用 execution policies 之一。这可以大大简化检查多个工作包的过程。在幕后发生的事情通常是它从 built-in 线程池中挑选线程并有效地向它们分配工作。它通常在 Linux 和 Windows 中使用 just enough™ 线程,它会使用你剩下的所有 CPU (0%当 CPU:s 开始以最大频率旋转时所有核心都空闲)-奇怪的是没有使 Linux 和 Windows “迟钝”。
这里我使用了执行策略 std::execution::parallel_policy
(由 std::execution::par
常量表示)。如果你能把需要做的工作准备好,放在一个容器里,比如std::vector
,那就真的很简单了。
#include <algorithm>
#include <chrono>
#include <execution> // std::execution::par
#include <iostream>
// #include <thread> // not needed to run with execuion policies
#include <vector>
struct work_package {
work_package() : payload(co) { ++co; }
int payload;
static int co;
};
int work_package::co = 10;
int main() {
std::vector<work_package> wps(22*6); // 132 work packages
for(const auto& wp : wps) std::cout << wp.payload << '\n'; // prints 10 to 141
// work on the work packages
std::for_each(std::execution::par, wps.begin(), wps.end(), [](auto& wp) {
// Probably in a thread - As long as you do not write to the same work package
// from different threads, you don't need synchronization here.
// do some work with the work package
++wp.payload;
});
for(const auto& wp : wps) std::cout << wp.payload << '\n'; // prints 11 to 142
}
使用 g++
,您可能需要安装 tbb
(The Threading Building Blocks),您还需要 link 安装:-ltbb
.
apt install libtbb-dev
Ubuntu。dnf install tbb-devel.x86_64
在 Fedora 上。
其他发行版可能会有不同的称呼。
Visual Studio(2017 年及以后)link 自动使用适当的库(如果我现在弄错了,还有 tbb
)。