如何使用 `boost::thread_specific_ptr` 和 `for_each`

How to use `boost::thread_specific_ptr` with `for_each`

在下面的代码中,Bar 应该模拟一个创建成本适中的线程不安全对象。 Foo 包含一个 Bar 并且是多线程的,因此它使用 thread_specific_ptr<Bar> 来创建一个每线程 Bar 可以在对 [= 的多次调用中重复使用18=] 用于相同的 Foo(因此分摊为每个线程创建 Bar 的成本)。 Foo 总是创建具有相同 numBar,因此健全性检查应该 总是通过,但它失败了。

原因(我认为)在 requirement for the thread_specific_ptr destructor:

中有解释

All the thread specific instances associated to this thread_specific_ptr (except maybe the one associated to this thread) must be null.

所以这个问题是由三件事共同造成的:

  1. Bar 在工作线程中创建的对象在 Foos thread_specific_ptr 被清理时不会被清理,因此在 main 中的循环迭代中持久存在(本质上是内存泄漏)
  2. C++ 运行time 在 main
  3. 循环迭代之间重用 for_each 中的线程
  4. C++ 运行time 正在将 main 循环中的每个 Foo 重新分配到相同的内存地址

索引 thread_specific_ptr 的方式(通过 thread_specific_ptr 的内存地址和线程 ID)导致旧的 Bar 被意外重用。我明白这个问题;我不明白的是该怎么办。请注意文档中的注释:

The requirement is due to the fact that in order to delete all these instances, the implementation should be forced to maintain a list of all the threads having an associated specific ptr, which is against the goal of thread specific data.

我也想避免这种复杂性。

如何使用 for_each 进行简单的线程管理,同时避免内存泄漏?解决方案要求:

以下代码可以编译并在具有足够内核的机器上以 return 代码 1 高概率退出。

#include <boost/thread/tss.hpp>
#include <execution>
#include <iostream>
#include <vector>

using namespace std;

class Bar {
    public:
    // models a thread-unsafe object
    explicit Bar(int i) : num(i) { }
    int num;
};

class Foo {
    public:
    explicit Foo(int i) : num(i) { }

    void loop() {
        vector<int> idxs(32);
        iota(begin(idxs), end(idxs), 0);
        for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
            if (ptr.get() == nullptr) {
                // no `Bar` exists for this thread yet, so create one
                Bar *tmp = new Bar(num);
                ptr.reset(tmp);
            }
            // Get the thread-local Bar
            Bar &b = *ptr;

            // Sanity check: we ALWAYS create a `Bar` with the same num as `Foo`;
            // see the `if` block above.
            // Therefore, this condition shouldn't ever be true (but it is!)
            if (b.num != num) {
               cout << "NOT THREAD SAFE: Foo index is " << num << ", but Bar index is " << b.num << endl;
               exit(1);
            }
        });
    }
    boost::thread_specific_ptr<Bar> ptr;
    int num;
};

int main() {
    for(int i = 0; i < 100; i++) {
        Foo f(i);
        f.loop();
    }
    return 0;
}

According to the documentation

~thread_specific_ptr();

Requires:
  All the thread specific instances associated to this thread_specific_ptr 
  (except maybe the one associated to this thread) must be null.

这意味着您不能销毁 Foo,直到它的所有 Bar 被销毁。这是一个问题,因为 execution_policy::par 不必在新的线程池上操作,也不必在 for_each() 完成后终止线程。

这足以让我们回答所提出的问题:您只能使用 thread_specific_ptrexecution::par 在同一线程上的不同迭代之间共享数据,如果:

  • thread_specific_ptr 永远不会被破坏。这是必需的,因为无法知道 for_each 的给定迭代是否将是其分配线程的最后一次迭代,并且该线程可能 永远不会 再次被调度。
  • 您很乐意在程序结束前为每个线程泄漏一个指向对象的实例。

您的代码中发生了什么

我们已经进入未定义行为领域,但您所看到的行为仍然可以进一步解释。考虑到:

Boost.Thread uses the address of the thread_specific_ptr instance as key of the thread specific pointers. This avoids to create/destroy a key which will need a lock to protect from race conditions. This has a little performance liability, as the access must be done using an associative container.

... 并且 Foo 的所有 100 个实例很可能在内存中的同一位置,您最终会看到前一个 Foo 中的 Bar 实例,当工作线程被回收,导致你的(innacurate,见下文)检查命中。

解决方案:我认为你应该做的

我建议你完全放弃 thread_specific_ptr 并使用关联容器手动管理 per-thread/per-Foo Bar 实例的池,这样可以管理生命周期Bar 个对象的数量要简单得多:


class per_thread_bar_pool {
  std::map<std::thread::id, Bar> bars_;
  // alternatively: 
  // std::map<std::thread::id, std::unique_ptr<Bar>> bars_;
  std::mutex mtx_;

public:
  Bar& get(int num) {
    auto tid = std::this_thread::get_id();

    std::unique_lock l{mtx_};
    auto found = bars_.find(tid);
    if(found == bars_.end()) {
        l.unlock(); // Let other threads access the map while `Bar` is being built.
        Bar new_bar(num);
        // auto new_bar = std::make_unique<Bar>(num); 
        l.lock();

        assert(bars_.find(tid) == bars_.end());
        found = bars_.emplace(tid, std::move(new_bar)).first;
    }
    return found->second;
    // return *found->second;
  }
};


void loop() {
    per_thread_bar_pool bars;
    vector<int> idxs(32);
    iota(begin(idxs), end(idxs), 0);

    for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
        Bar& current_bar = bars.get(num);
        // ...
   }
}

thread_specific_ptr 已经在后台使用了 std::map<>(它为每个线程维护一个)。所以在这里介绍一个也没什么大不了的。

我们确实引入了一个互斥体,但它只对映射中的简单 lookup/insertion 起作用,并且由于构建 Bar 应该非常昂贵,所以它很可能有很影响不大。它还具有 Foo 的多个实例不再相互交互的好处,因此您可以避免在最终从多个线程调用 foo::loop() 时可能发生的意外错误。

N.B.if (b.num != num) { 不是有效测试,因为来自给定 Foo 的所有 Bar 实例共享同样num。不过那只会导致假阴性。

解决方案:使您的代码工作(几乎)

综上所述,如果您绝对热衷于同时使用 thread_specific_pointerexecution::par,则必须执行以下操作:

void loop() {
        static boost::thread_specific_ptr<Bar> ptr; // lives till the end of the program

        vector<int> idxs(32);
        iota(begin(idxs), end(idxs), 0);
        for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
            if (ptr.get() == nullptr || ptr->num != num) {
                // no `Bar` exists for this thread yet, or it's from a previous run
                Bar *tmp = new Bar(num);
                ptr.reset(tmp);
            }
            // Get the thread-local Bar
            Bar &b = *ptr;

        });

但是,此 每个线程泄漏多达 1 Bar,因为清理只会在我们尝试重用前一个 Bar 时发生运行。没有办法解决这个问题。