如何正确同步这两个线程?

How can I syncronize these two threads properly?

我想正确同步不同的线程,但到目前为止我只能编写一个不优雅的解决方案。有人可以指出我如何改进以下代码吗?

typedef void (*func)();

void thread(func func1, func func2, int& has_finished, int& id) {
    has_finished--;
    func1();
    has_finished++;
    while (has_finished != 0) std::cout << "thread " << id << " waiting\n";
    std::cout << "thread" << id << "resuming\n";
    func2();
}

int main() {
    int has_finished(0), id_one(0), id_two(1);
    std::thread t1(thread, fun, fun, std::ref(has_finished), std::ref(id_one));
    std::thread t2(thread, fun, fun, std::ref(has_finished), std::ref(id_two));
    t1.join();
    t2.join();
};

程序的要点由函数thread描述。该函数由两个 std::thread 执行。该函数接受两个 long-运行 函数 func1func2 以及两个整数引用作为参数。 线程应仅在所有线程退出 func1 后调用 func2。参数 has_finished 用于协调不同线程:进入函数后,has_arguments 为零。然后每个 std::thread 递减值并调用 long-运行 函数 func1。在离开 func1 之后,has_finished 再次递增。只要该值不是其原始值零,线程就会等待。然后,每个线程在 func2 上工作。主要功能显示在最后。

如何更好地协调两个线程?我正在考虑使用 std::mutexstd::condition_variable 但不知道如何正确使用它们?有人知道我如何改进程序吗?

您选择的方法实际上不会起作用,并且由于竞争条件会导致未定义的行为。如您所料,您需要一个条件变量。

这里是一个 Gate class 演示如何使用条件变量来实现一个门,该门在继续之前等待一定数量的线程到达它:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <sstream>
#include <utility>
#include <cassert>

struct Gate {
 public:
    explicit Gate(unsigned int count = 2) : count_(count) { }  // How many threads need to reach the gate before it unlocks
    Gate(Gate const &) = delete;
    void operator =(Gate const &) = delete;

    void wait_for_gate();

 private:
    int count_;
    ::std::mutex count_mutex_;
    ::std::condition_variable count_gate_;
};

void Gate::wait_for_gate()
{
    ::std::unique_lock<::std::mutex> guard(count_mutex_);
    assert(count > 0); // Count being 0 here indicates an irrecoverable programming error.
    --count_;
    count_gate_.wait(guard, [this](){ return this-> count_ <= 0; });
    guard.unlock();
    count_gate_.notify_all();
}

void f1()
{
    ::std::ostringstream msg;
    msg << "In f1 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();
}

void f2()
{
    ::std::ostringstream msg;
    msg << "In f2 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();
}

void thread_func(Gate &gate)
{
    f1();
    gate.wait_for_gate();
    f2();
}

int main()
{
    Gate gate;
    ::std::thread t1{thread_func, ::std::ref(gate)};
    ::std::thread t2{thread_func, ::std::ref(gate)};
    t1.join();
    t2.join();
}

希望这段代码的结构看起来很像您的代码,以便您能够理解这里发生的事情。通过阅读您的代码,您似乎正在寻找所有线程来执行 func1,然后是 func2。您不希望 func2 运行ning 任何线程正在执行 func1.

这可以被认为是所有线程在进入 运行 func2 之前等待到达 'finished func1' 位置的门。

我在自己的本地版本的编译器资源管理器上测试了这段代码。

另一个答案中latch的主要缺点是它还不是标准的C++。我的 Gate class 是另一个答案中提到的闩锁 class 的简单实现,它是标准的 C++。

条件变量的基本工作方式是解锁互斥量,等待通知,然后锁定该互斥量并测试条件。如果条件为真,它将继续而不解锁互斥量。如果条件为假,则重新开始。

因此,在条件变量表明条件为真之后,您必须做任何您需要做的事情,然后解锁互斥锁并通知所有人您已经完成了。

这里的互斥锁保护共享计数变量。每当你有一个共享值时,你应该用互斥量来保护它,这样任何线程都无法看到处于不一致状态的值。条件是线程可以等待那个计数达到0,表示所有线程都已经递减计数变量。

不要自己写这个。这种同步被称为 "latch"(或更一般地称为 "barrier",它可以通过各种库和 C++ 并发 TS 获得。(它也可能在某些情况下进入 C++20形式。)

例如,使用a version from Boost:

#include <iostream>
#include <thread>

#include <boost/thread/latch.hpp>

void f(boost::latch& c) {
    std::cout << "Doing work in round 1\n";
    c.count_down_and_wait();
    std::cout << "Doing work in round 2\n";
}

int main() {
    boost::latch c(2);

    std::thread t1(f, std::ref(c)), t2(f, std::ref(c));
    t1.join();
    t2.join();
}