C++。 std::condition_variable 和多个等待线程
C++. std::condition_variable and multiple wait-threads
我有一些 class,queue
的 std::function<void()>
成员和方法 Push
和 Pop
。
我想实现加法PushAndWaitUntilExecuted
。当你有一个 consumer-thread
(调用 Pop
)和一个 producer-thread
(调用 Push
)时很容易 - 只需简单的 std::condition_variable
就足够了。
但我的应用程序有动态数量的线程,可以通过并行调用 PushAndWaitUntilExecuted
函数执行相同的代码行,并等待 consumer-thread
执行推送的 std::function
对象。
A 有想法将 std::pair<uint64_t, std::function<void()>>
传递给 queue
而不仅仅是 std::function<void()>
,其中 uint64_t
- producer-thread ID
(boost::this_thread::get_id()
) .然后 consumer-thread
调用 std::condition_variable::notify_all()
并且所有线程将检查执行的 std::function
是否与线程具有相同的 ID
。
这是好的解决方案还是可以实施更好的方法?
为了避免几个不同的竞争条件,这里需要引入的不仅仅是一个条件变量。还需要互斥锁和作业完成标志。
此时,将您的 std::function<void()>
替换为包含此闭包以及所有额外行李的小 class 变得更清晰:
struct job {
std::function<void()> implementation;
std::mutex m;
std::condition_variable flag;
bool completed=false;
};
你的队列变成了 std::shared_ptr<job>
s 的队列,而不是 std::function
s 的队列,作业是在动态范围内构建的(因为,当然,互斥锁和条件变量是不可复制的或可移动,并且可以从您的两个线程访问这些对象)。
在您的工作线程执行完实现后,它:
- 锁定互斥量。
- 将
completed
设置为 true
- 指示条件变量。
而你的PushAndWaitUntilExecuted
,在它执行推送后:
- 锁定互斥体
- 等待条件变量,直到设置
completed
你必须彻底理解 C++ 绝对不能保证,在你将一个新的闭包推入你的作业队列后,一些线程不会立即抓住它,执行它,并在原来的之前完成它线程(推动它的线程)开始查看条件变量。到现在为止,没有人会再向条件变量发出信号。如果您在这里只需要使用一个条件变量,那么您将等待条件变量发出信号,直到我们的太阳爆炸。
这就是为什么您不仅需要一个条件变量、一个互斥锁和一个显式标志,还需要使用上述方法来正确处理线程间排序。
这是一种相当class常规的方法。您应该在每本关于此主题的优秀 C++ 教科书中找到许多类似实现的示例。
我有一些 class,queue
的 std::function<void()>
成员和方法 Push
和 Pop
。
我想实现加法PushAndWaitUntilExecuted
。当你有一个 consumer-thread
(调用 Pop
)和一个 producer-thread
(调用 Push
)时很容易 - 只需简单的 std::condition_variable
就足够了。
但我的应用程序有动态数量的线程,可以通过并行调用 PushAndWaitUntilExecuted
函数执行相同的代码行,并等待 consumer-thread
执行推送的 std::function
对象。
A 有想法将 std::pair<uint64_t, std::function<void()>>
传递给 queue
而不仅仅是 std::function<void()>
,其中 uint64_t
- producer-thread ID
(boost::this_thread::get_id()
) .然后 consumer-thread
调用 std::condition_variable::notify_all()
并且所有线程将检查执行的 std::function
是否与线程具有相同的 ID
。
这是好的解决方案还是可以实施更好的方法?
为了避免几个不同的竞争条件,这里需要引入的不仅仅是一个条件变量。还需要互斥锁和作业完成标志。
此时,将您的 std::function<void()>
替换为包含此闭包以及所有额外行李的小 class 变得更清晰:
struct job {
std::function<void()> implementation;
std::mutex m;
std::condition_variable flag;
bool completed=false;
};
你的队列变成了 std::shared_ptr<job>
s 的队列,而不是 std::function
s 的队列,作业是在动态范围内构建的(因为,当然,互斥锁和条件变量是不可复制的或可移动,并且可以从您的两个线程访问这些对象)。
在您的工作线程执行完实现后,它:
- 锁定互斥量。
- 将
completed
设置为 true - 指示条件变量。
而你的PushAndWaitUntilExecuted
,在它执行推送后:
- 锁定互斥体
- 等待条件变量,直到设置
completed
你必须彻底理解 C++ 绝对不能保证,在你将一个新的闭包推入你的作业队列后,一些线程不会立即抓住它,执行它,并在原来的之前完成它线程(推动它的线程)开始查看条件变量。到现在为止,没有人会再向条件变量发出信号。如果您在这里只需要使用一个条件变量,那么您将等待条件变量发出信号,直到我们的太阳爆炸。
这就是为什么您不仅需要一个条件变量、一个互斥锁和一个显式标志,还需要使用上述方法来正确处理线程间排序。
这是一种相当class常规的方法。您应该在每本关于此主题的优秀 C++ 教科书中找到许多类似实现的示例。