使用 omp_get_thread_num 索引全局向量是否安全?
Is it safe to use omp_get_thread_num to index a global vector?
我有这样的代码:
thread_local CustomAllocator* ts_alloc = nullptr;
struct AllocatorSetup
{
AllocatorSetup( int threadNum )
{
static std::vector<CustomAllocator> vec( (size_t)omp_get_max_threads() );
ts_alloc = &vec.at( threadNum );
}
~AllocatorSetup()
{
ts_alloc->resetArena();
ts_alloc = nullptr;
}
AllocatorSetup() = delete;
AllocatorSetup( const AllocatorSetup& ) = delete;
void operator=( const AllocatorSetup& ) = delete;
};
template<class E>
inline E* allocateBuffer( size_t count )
{
return (E*)ts_alloc->allocate( count * sizeof( E ), alignof( E ) );
}
void computeThings()
{
#pragma omp parallel for
for( int64_t i = 0; i < 100000; i++ )
{
AllocatorSetup allocSetup{ omp_get_thread_num() };
float* const buffer = allocateBuffer<float>( 1024 * 256 );
// ..some computations here
}
}
多个线程同时调用computeThings()
会不会中断?
换句话说,在任何给定时间,omp_get_thread_num()
索引和本地线程之间的关系是一对一还是一对多?
The documentation spells this out.
The omp_get_thread_num routine returns the thread number, within the current team, of the calling thread.
The binding thread set for an omp_get_thread_num region is the current team. The binding region for an omp_get_thread_num region is the innermost enclosing parallel region.
The omp_get_thread_num routine returns the thread number of the calling thread, within the team that is executing the parallel region to which the routine region binds. The thread number is an integer between 0 and one less than the value returned by omp_get_num_threads, inclusive. The thread number of the master thread of the team is 0. The routine returns 0 if it is called from the sequential part of a program.
强调我的。
这意味着如果您有多个团队并发执行,则每个团队中的线程编号为 0 .. nbr_threads_in_team
。因此,如果多个线程同时处于不同的 运行 团队中,它们将获得相同的线程号。
虽然 OpenMP 肯定会重复使用线程,但如果它实际上同时运行两个并行部分,您将同时拥有两个线程组 运行。由于一个线程不能同时做两件事,所以这些线程肯定是不同的,但每个团队中的线程都是编号的 0 .. nbr_threads_in_team
.
这意味着您的 computeThings()
函数目前不是线程安全的。您可以改为构建一个 CustomAllocator
对象的线程安全池,线程可以从中“借出”一个对象,并在线程完成后 return 它。
像这样:
void computeThings() {
#pragma omp parallel
{
//Since we're in a parallel block, each thread
//will lend its own CustomAllocator
CustomAllocator& theAllocator = allocatorPool.lend();
#pragma omp for
for (...) {
/* Do stuff with the allocator */
}
allocatorPool.unLend(theAllocator);
}
}
在生产代码中,lend()
和 unLend()
当然应该使用 RAII 来实现,以避免在异常情况下发生泄漏。请注意,它只在整个并行 for 循环之前获取一次新资源,而不是在循环的每次迭代中获取新资源,因此每次调用只需为缓存未命中支付一次费用。 (除非您在循环中调用该函数本身,否则您的数据不太可能在对 computeThings
的调用之间保留在 L1 中。)如果您确实需要对 L1 友好,即使在函数的多次调用中,您可以将每个资源与线程 ID 相关联,如果“首选资源”仍然可用,则优先 return 将资源关联到先前请求它们的同一线程。
下面是此类缓存的示例,它试图为每个请求线程提供其首选资源实例:
//Warning, untested!
template<typename T>
class ThreadAffineCache {
std::unordered_map<std::thread::id, std::unique_ptr<T>> m_cache = {};
std::mutex m_mtx{};
public:
std::unique_ptr<T> lend() {
std::unique_lock lk{m_mtx};
auto tid = std::this_thread::get_id();
std::unique_ptr<T> result = std::move(m_cache[tid]);
m_cache.erase(tid);
if (!result) {
if (m_cache.empty()) {
result = std::make_unique<T>();
} else {
auto iter = m_cache.begin();
result = std::move(*iter);
m_cache.erase(iter);
}
}
assert(result);
return result;
}
void unLend(std::unique_ptr<T> obj) {
assert(obj);
std::unique_lock lk{m_mtx};
m_cache[std::this_thread::get_id()] = std::move(obj);
}
}
或者,您可以使用 return 由 pthread_self
或 gettid
编入索引的地图。我不确定 std::thread::id
return 是否对 OpenMP 工作线程有意义。线程终止时清理也可能是个问题。
如果您只是需要一个简单的出路,您还可以使用互斥锁来保护 computeThings
。
我有这样的代码:
thread_local CustomAllocator* ts_alloc = nullptr;
struct AllocatorSetup
{
AllocatorSetup( int threadNum )
{
static std::vector<CustomAllocator> vec( (size_t)omp_get_max_threads() );
ts_alloc = &vec.at( threadNum );
}
~AllocatorSetup()
{
ts_alloc->resetArena();
ts_alloc = nullptr;
}
AllocatorSetup() = delete;
AllocatorSetup( const AllocatorSetup& ) = delete;
void operator=( const AllocatorSetup& ) = delete;
};
template<class E>
inline E* allocateBuffer( size_t count )
{
return (E*)ts_alloc->allocate( count * sizeof( E ), alignof( E ) );
}
void computeThings()
{
#pragma omp parallel for
for( int64_t i = 0; i < 100000; i++ )
{
AllocatorSetup allocSetup{ omp_get_thread_num() };
float* const buffer = allocateBuffer<float>( 1024 * 256 );
// ..some computations here
}
}
多个线程同时调用computeThings()
会不会中断?
换句话说,在任何给定时间,omp_get_thread_num()
索引和本地线程之间的关系是一对一还是一对多?
The documentation spells this out.
The omp_get_thread_num routine returns the thread number, within the current team, of the calling thread.
The binding thread set for an omp_get_thread_num region is the current team. The binding region for an omp_get_thread_num region is the innermost enclosing parallel region.
The omp_get_thread_num routine returns the thread number of the calling thread, within the team that is executing the parallel region to which the routine region binds. The thread number is an integer between 0 and one less than the value returned by omp_get_num_threads, inclusive. The thread number of the master thread of the team is 0. The routine returns 0 if it is called from the sequential part of a program.
强调我的。
这意味着如果您有多个团队并发执行,则每个团队中的线程编号为 0 .. nbr_threads_in_team
。因此,如果多个线程同时处于不同的 运行 团队中,它们将获得相同的线程号。
虽然 OpenMP 肯定会重复使用线程,但如果它实际上同时运行两个并行部分,您将同时拥有两个线程组 运行。由于一个线程不能同时做两件事,所以这些线程肯定是不同的,但每个团队中的线程都是编号的 0 .. nbr_threads_in_team
.
这意味着您的 computeThings()
函数目前不是线程安全的。您可以改为构建一个 CustomAllocator
对象的线程安全池,线程可以从中“借出”一个对象,并在线程完成后 return 它。
像这样:
void computeThings() {
#pragma omp parallel
{
//Since we're in a parallel block, each thread
//will lend its own CustomAllocator
CustomAllocator& theAllocator = allocatorPool.lend();
#pragma omp for
for (...) {
/* Do stuff with the allocator */
}
allocatorPool.unLend(theAllocator);
}
}
在生产代码中,lend()
和 unLend()
当然应该使用 RAII 来实现,以避免在异常情况下发生泄漏。请注意,它只在整个并行 for 循环之前获取一次新资源,而不是在循环的每次迭代中获取新资源,因此每次调用只需为缓存未命中支付一次费用。 (除非您在循环中调用该函数本身,否则您的数据不太可能在对 computeThings
的调用之间保留在 L1 中。)如果您确实需要对 L1 友好,即使在函数的多次调用中,您可以将每个资源与线程 ID 相关联,如果“首选资源”仍然可用,则优先 return 将资源关联到先前请求它们的同一线程。
下面是此类缓存的示例,它试图为每个请求线程提供其首选资源实例:
//Warning, untested!
template<typename T>
class ThreadAffineCache {
std::unordered_map<std::thread::id, std::unique_ptr<T>> m_cache = {};
std::mutex m_mtx{};
public:
std::unique_ptr<T> lend() {
std::unique_lock lk{m_mtx};
auto tid = std::this_thread::get_id();
std::unique_ptr<T> result = std::move(m_cache[tid]);
m_cache.erase(tid);
if (!result) {
if (m_cache.empty()) {
result = std::make_unique<T>();
} else {
auto iter = m_cache.begin();
result = std::move(*iter);
m_cache.erase(iter);
}
}
assert(result);
return result;
}
void unLend(std::unique_ptr<T> obj) {
assert(obj);
std::unique_lock lk{m_mtx};
m_cache[std::this_thread::get_id()] = std::move(obj);
}
}
或者,您可以使用 return 由 pthread_self
或 gettid
编入索引的地图。我不确定 std::thread::id
return 是否对 OpenMP 工作线程有意义。线程终止时清理也可能是个问题。
如果您只是需要一个简单的出路,您还可以使用互斥锁来保护 computeThings
。