如何使用 `boost::thread_specific_ptr` 和 `for_each`
How to use `boost::thread_specific_ptr` with `for_each`
在下面的代码中,Bar
应该模拟一个创建成本适中的线程不安全对象。 Foo
包含一个 Bar
并且是多线程的,因此它使用 thread_specific_ptr<Bar>
来创建一个每线程 Bar
可以在对 [= 的多次调用中重复使用18=] 用于相同的 Foo
(因此分摊为每个线程创建 Bar
的成本)。 Foo
总是创建具有相同 num
的 Bar
,因此健全性检查应该 总是通过,但它失败了。
原因(我认为)在 requirement for the thread_specific_ptr
destructor:
中有解释
All the thread specific instances associated to this thread_specific_ptr (except maybe the one associated to this thread) must be null.
所以这个问题是由三件事共同造成的:
Bar
在工作线程中创建的对象在 Foo
s thread_specific_ptr
被清理时不会被清理,因此在 main
中的循环迭代中持久存在(本质上是内存泄漏)
- C++ 运行time 在
main
循环迭代之间重用 for_each
中的线程
- C++ 运行time 正在将
main
循环中的每个 Foo
重新分配到相同的内存地址
索引 thread_specific_ptr
的方式(通过 thread_specific_ptr
的内存地址和线程 ID)导致旧的 Bar
被意外重用。我明白这个问题;我不明白的是该怎么办。请注意文档中的注释:
The requirement is due to the fact that in order to delete all these instances, the implementation should be forced to maintain a list of all the threads having an associated specific ptr, which is against the goal of thread specific data.
我也想避免这种复杂性。
如何使用 for_each
进行简单的线程管理,同时避免内存泄漏?解决方案要求:
- 它应该只为每个
Foo
每个线程创建一个 Bar
(即不要在 for_each
中创建一个新的 Bar)
- 假设
Bar
不是线程安全的。
- 如果可能,使用
for_each
使并行循环尽可能简单
- 循环实际上应该 运行 并行(即,单个
Bar
周围没有互斥锁)
Bar
由 loop
创建的对象应该可以使用,直到创建它们的 Foo
对象被销毁,此时所有 Bar
对象也应该是已毁。
以下代码可以编译并在具有足够内核的机器上以 return 代码 1 高概率退出。
#include <boost/thread/tss.hpp>
#include <execution>
#include <iostream>
#include <vector>
using namespace std;
class Bar {
public:
// models a thread-unsafe object
explicit Bar(int i) : num(i) { }
int num;
};
class Foo {
public:
explicit Foo(int i) : num(i) { }
void loop() {
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
if (ptr.get() == nullptr) {
// no `Bar` exists for this thread yet, so create one
Bar *tmp = new Bar(num);
ptr.reset(tmp);
}
// Get the thread-local Bar
Bar &b = *ptr;
// Sanity check: we ALWAYS create a `Bar` with the same num as `Foo`;
// see the `if` block above.
// Therefore, this condition shouldn't ever be true (but it is!)
if (b.num != num) {
cout << "NOT THREAD SAFE: Foo index is " << num << ", but Bar index is " << b.num << endl;
exit(1);
}
});
}
boost::thread_specific_ptr<Bar> ptr;
int num;
};
int main() {
for(int i = 0; i < 100; i++) {
Foo f(i);
f.loop();
}
return 0;
}
According to the documentation
~thread_specific_ptr();
Requires:
All the thread specific instances associated to this thread_specific_ptr
(except maybe the one associated to this thread) must be null.
这意味着您不能销毁 Foo
,直到它的所有 Bar
被销毁。这是一个问题,因为 execution_policy::par
不必在新的线程池上操作,也不必在 for_each()
完成后终止线程。
这足以让我们回答所提出的问题:您只能使用 thread_specific_ptr
和 execution::par
在同一线程上的不同迭代之间共享数据,如果:
thread_specific_ptr
永远不会被破坏。这是必需的,因为无法知道 for_each
的给定迭代是否将是其分配线程的最后一次迭代,并且该线程可能 永远不会 再次被调度。
- 您很乐意在程序结束前为每个线程泄漏一个指向对象的实例。
您的代码中发生了什么
我们已经进入未定义行为领域,但您所看到的行为仍然可以进一步解释。考虑到:
Boost.Thread uses the address of the thread_specific_ptr instance as key of the thread specific pointers. This avoids to create/destroy a key which will need a lock to protect from race conditions. This has a little performance liability, as the access must be done using an associative container.
... 并且 Foo
的所有 100 个实例很可能在内存中的同一位置,您最终会看到前一个 Foo
中的 Bar
实例,当工作线程被回收,导致你的(innacurate,见下文)检查命中。
解决方案:我认为你应该做的
我建议你完全放弃 thread_specific_ptr
并使用关联容器手动管理 per-thread/per-Foo
Bar
实例的池,这样可以管理生命周期Bar
个对象的数量要简单得多:
class per_thread_bar_pool {
std::map<std::thread::id, Bar> bars_;
// alternatively:
// std::map<std::thread::id, std::unique_ptr<Bar>> bars_;
std::mutex mtx_;
public:
Bar& get(int num) {
auto tid = std::this_thread::get_id();
std::unique_lock l{mtx_};
auto found = bars_.find(tid);
if(found == bars_.end()) {
l.unlock(); // Let other threads access the map while `Bar` is being built.
Bar new_bar(num);
// auto new_bar = std::make_unique<Bar>(num);
l.lock();
assert(bars_.find(tid) == bars_.end());
found = bars_.emplace(tid, std::move(new_bar)).first;
}
return found->second;
// return *found->second;
}
};
void loop() {
per_thread_bar_pool bars;
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
Bar& current_bar = bars.get(num);
// ...
}
}
thread_specific_ptr
已经在后台使用了 std::map<>
(它为每个线程维护一个)。所以在这里介绍一个也没什么大不了的。
我们确实引入了一个互斥体,但它只对映射中的简单 lookup/insertion 起作用,并且由于构建 Bar
应该非常昂贵,所以它很可能有很影响不大。它还具有 Foo
的多个实例不再相互交互的好处,因此您可以避免在最终从多个线程调用 foo::loop()
时可能发生的意外错误。
N.B.:if (b.num != num) {
不是有效测试,因为来自给定 Foo
的所有 Bar
实例共享同样num
。不过那只会导致假阴性。
解决方案:使您的代码工作(几乎)
综上所述,如果您绝对热衷于同时使用 thread_specific_pointer
和 execution::par
,则必须执行以下操作:
void loop() {
static boost::thread_specific_ptr<Bar> ptr; // lives till the end of the program
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
if (ptr.get() == nullptr || ptr->num != num) {
// no `Bar` exists for this thread yet, or it's from a previous run
Bar *tmp = new Bar(num);
ptr.reset(tmp);
}
// Get the thread-local Bar
Bar &b = *ptr;
});
但是,此 将 每个线程泄漏多达 1 Bar
,因为清理只会在我们尝试重用前一个 Bar
时发生运行。没有办法解决这个问题。
在下面的代码中,Bar
应该模拟一个创建成本适中的线程不安全对象。 Foo
包含一个 Bar
并且是多线程的,因此它使用 thread_specific_ptr<Bar>
来创建一个每线程 Bar
可以在对 [= 的多次调用中重复使用18=] 用于相同的 Foo
(因此分摊为每个线程创建 Bar
的成本)。 Foo
总是创建具有相同 num
的 Bar
,因此健全性检查应该 总是通过,但它失败了。
原因(我认为)在 requirement for the thread_specific_ptr
destructor:
All the thread specific instances associated to this thread_specific_ptr (except maybe the one associated to this thread) must be null.
所以这个问题是由三件事共同造成的:
Bar
在工作线程中创建的对象在Foo
sthread_specific_ptr
被清理时不会被清理,因此在main
中的循环迭代中持久存在(本质上是内存泄漏)- C++ 运行time 在
main
循环迭代之间重用 - C++ 运行time 正在将
main
循环中的每个Foo
重新分配到相同的内存地址
for_each
中的线程
索引 thread_specific_ptr
的方式(通过 thread_specific_ptr
的内存地址和线程 ID)导致旧的 Bar
被意外重用。我明白这个问题;我不明白的是该怎么办。请注意文档中的注释:
The requirement is due to the fact that in order to delete all these instances, the implementation should be forced to maintain a list of all the threads having an associated specific ptr, which is against the goal of thread specific data.
我也想避免这种复杂性。
如何使用 for_each
进行简单的线程管理,同时避免内存泄漏?解决方案要求:
- 它应该只为每个
Foo
每个线程创建一个Bar
(即不要在for_each
中创建一个新的 Bar) - 假设
Bar
不是线程安全的。 - 如果可能,使用
for_each
使并行循环尽可能简单 - 循环实际上应该 运行 并行(即,单个
Bar
周围没有互斥锁) Bar
由loop
创建的对象应该可以使用,直到创建它们的Foo
对象被销毁,此时所有Bar
对象也应该是已毁。
以下代码可以编译并在具有足够内核的机器上以 return 代码 1 高概率退出。
#include <boost/thread/tss.hpp>
#include <execution>
#include <iostream>
#include <vector>
using namespace std;
class Bar {
public:
// models a thread-unsafe object
explicit Bar(int i) : num(i) { }
int num;
};
class Foo {
public:
explicit Foo(int i) : num(i) { }
void loop() {
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
if (ptr.get() == nullptr) {
// no `Bar` exists for this thread yet, so create one
Bar *tmp = new Bar(num);
ptr.reset(tmp);
}
// Get the thread-local Bar
Bar &b = *ptr;
// Sanity check: we ALWAYS create a `Bar` with the same num as `Foo`;
// see the `if` block above.
// Therefore, this condition shouldn't ever be true (but it is!)
if (b.num != num) {
cout << "NOT THREAD SAFE: Foo index is " << num << ", but Bar index is " << b.num << endl;
exit(1);
}
});
}
boost::thread_specific_ptr<Bar> ptr;
int num;
};
int main() {
for(int i = 0; i < 100; i++) {
Foo f(i);
f.loop();
}
return 0;
}
According to the documentation
~thread_specific_ptr();
Requires:
All the thread specific instances associated to this thread_specific_ptr
(except maybe the one associated to this thread) must be null.
这意味着您不能销毁 Foo
,直到它的所有 Bar
被销毁。这是一个问题,因为 execution_policy::par
不必在新的线程池上操作,也不必在 for_each()
完成后终止线程。
这足以让我们回答所提出的问题:您只能使用 thread_specific_ptr
和 execution::par
在同一线程上的不同迭代之间共享数据,如果:
thread_specific_ptr
永远不会被破坏。这是必需的,因为无法知道for_each
的给定迭代是否将是其分配线程的最后一次迭代,并且该线程可能 永远不会 再次被调度。- 您很乐意在程序结束前为每个线程泄漏一个指向对象的实例。
您的代码中发生了什么
我们已经进入未定义行为领域,但您所看到的行为仍然可以进一步解释。考虑到:
Boost.Thread uses the address of the thread_specific_ptr instance as key of the thread specific pointers. This avoids to create/destroy a key which will need a lock to protect from race conditions. This has a little performance liability, as the access must be done using an associative container.
... 并且 Foo
的所有 100 个实例很可能在内存中的同一位置,您最终会看到前一个 Foo
中的 Bar
实例,当工作线程被回收,导致你的(innacurate,见下文)检查命中。
解决方案:我认为你应该做的
我建议你完全放弃 thread_specific_ptr
并使用关联容器手动管理 per-thread/per-Foo
Bar
实例的池,这样可以管理生命周期Bar
个对象的数量要简单得多:
class per_thread_bar_pool {
std::map<std::thread::id, Bar> bars_;
// alternatively:
// std::map<std::thread::id, std::unique_ptr<Bar>> bars_;
std::mutex mtx_;
public:
Bar& get(int num) {
auto tid = std::this_thread::get_id();
std::unique_lock l{mtx_};
auto found = bars_.find(tid);
if(found == bars_.end()) {
l.unlock(); // Let other threads access the map while `Bar` is being built.
Bar new_bar(num);
// auto new_bar = std::make_unique<Bar>(num);
l.lock();
assert(bars_.find(tid) == bars_.end());
found = bars_.emplace(tid, std::move(new_bar)).first;
}
return found->second;
// return *found->second;
}
};
void loop() {
per_thread_bar_pool bars;
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
Bar& current_bar = bars.get(num);
// ...
}
}
thread_specific_ptr
已经在后台使用了 std::map<>
(它为每个线程维护一个)。所以在这里介绍一个也没什么大不了的。
我们确实引入了一个互斥体,但它只对映射中的简单 lookup/insertion 起作用,并且由于构建 Bar
应该非常昂贵,所以它很可能有很影响不大。它还具有 Foo
的多个实例不再相互交互的好处,因此您可以避免在最终从多个线程调用 foo::loop()
时可能发生的意外错误。
N.B.:if (b.num != num) {
不是有效测试,因为来自给定 Foo
的所有 Bar
实例共享同样num
。不过那只会导致假阴性。
解决方案:使您的代码工作(几乎)
综上所述,如果您绝对热衷于同时使用 thread_specific_pointer
和 execution::par
,则必须执行以下操作:
void loop() {
static boost::thread_specific_ptr<Bar> ptr; // lives till the end of the program
vector<int> idxs(32);
iota(begin(idxs), end(idxs), 0);
for_each(__pstl::execution::par, begin(idxs), end(idxs), [&](int) {
if (ptr.get() == nullptr || ptr->num != num) {
// no `Bar` exists for this thread yet, or it's from a previous run
Bar *tmp = new Bar(num);
ptr.reset(tmp);
}
// Get the thread-local Bar
Bar &b = *ptr;
});
但是,此 将 每个线程泄漏多达 1 Bar
,因为清理只会在我们尝试重用前一个 Bar
时发生运行。没有办法解决这个问题。