这种线程间对象共享策略合理吗?
Is this inter-thread object sharing strategy sound?
我正在尝试想出一种快速解决以下问题的方法:
我有一个生成数据的线程,以及几个使用数据的线程。我不需要对生成的数据进行排队,因为生成的数据比消耗的数据慢得多(即使偶尔出现这种情况,如果偶尔跳过数据点也不会成为问题)。所以,基本上,我有一个封装 "most recent state" 的对象,只允许生产者线程更新它。
我的策略如下(如果我完全失去理智请告诉我):
我为这个例子创建了三个 类:Thing
(实际状态对象),SharedObject<Thing>
(一个对象,可以是每个线程的本地对象,并给出thread access to the underlying Thing) 和 SharedObjectManager<Thing>
,它包含一个 shared_ptr
和一个 mutex
.
SharedObjectManager
(SOM) 的实例是一个全局变量。
当生产者启动时,它会实例化一个事物,并将其告知全局 SOM。然后它制作一个副本,并在该副本上完成所有更新工作。当它准备提交它对事物的更改时,它会将新事物传递给全局 SOM,后者锁定它的互斥锁,更新它保留的共享指针,然后释放锁。
同时,消费者线程全部实例化SharedObject<Thing>
。这些对象每个都保存一个指向全局 SOM 的指针,以及 SOM 保存的 shared_ptr
的缓存副本......它一直缓存到 update()
被显式调用。
我相信这越来越难以理解,所以这里有一些代码:
#include <mutex>
#include <iostream>
#include <memory>
class Thing
{
private:
int _some_member = 10;
public:
int some_member() const { return _some_member; }
void some_member(int val) {_some_member = val; }
};
// one global instance
template<typename T>
class SharedObjectManager
{
private:
std::shared_ptr<T> objPtr;
std::mutex objLock;
public:
std::shared_ptr<T> get_sptr()
{
std::lock_guard<std::mutex> lck(objLock);
return objPtr;
}
void commit_new_object(std::shared_ptr<T> new_object)
{
std::lock_guard<std::mutex> lck (objLock);
objPtr = new_object;
}
};
// one instance per consumer thread.
template<typename T>
class SharedObject
{
private:
SharedObjectManager<T> * som;
std::shared_ptr<T> cache;
public:
SharedObject(SharedObjectManager<T> * backend) : som(backend)
{update();}
void update()
{
cache = som->get_sptr();
}
T & operator *()
{
return *cache;
}
T * operator->()
{
return cache.get();
}
};
// no actual threads in this test, just a quick sanity check.
SharedObjectManager<Thing> glbSOM;
int main(void)
{
glbSOM.commit_new_object(std::make_shared<Thing>());
SharedObject<Thing> myobj(&glbSOM);
std::cout<<myobj->some_member()<<std::endl;
// prints "10".
}
生产者线程使用的思路是:
// initialization - on startup
auto firstStateObj = std::make_shared<Thing>();
glbSOM.commit_new_object(firstStateObj);
// main loop
while (1)
{
// invoke copy constructor to copy the current live Thing object
auto nextState = std::make_shared<Thing>(*(glbSOM.get_sptr()));
// do stuff to nextState, gradually filling out it's new value
// based on incoming data from other sources, etc.
...
// commit the changes to the shared memory location
glbSOM.commit_new_object(nextState);
}
消费者的使用将是:
SharedObject<Thing> thing(&glbSOM);
while(1)
{
// think about the data contained in thing, and act accordingly...
doStuffWith(thing->some_member());
// re-cache the thing
thing.update();
}
谢谢!
这是过度设计的方式。相反,我建议执行以下操作:
- 创建指向
Thing* theThing
的指针以及保护互斥锁。要么是全球性的,要么是通过其他方式共享的。将其初始化为 nullptr.
- 在您的生产者中:使用两个
Thing
类型的本地对象 - Thing thingOne
和 Thing thingTwo
(请记住,thingOne
并不比 thingTwo
好,但有一个被称为 thingOne
是有原因的,但这是一回事。小心猫。)。从填充 thingOne
开始。完成后,锁定互斥量,将 thingOne
地址复制到 theThing
,解锁互斥量。开始填充 thingTwo
。完成后,见上文。重复直到被杀死。
- 在每个侦听器中:(确保指针不是 nullptr)。锁定互斥量。复制
theThing
指向的两个对象。解锁互斥量。使用您的副本。阅后即焚。重复直到被杀死。
我正在尝试想出一种快速解决以下问题的方法:
我有一个生成数据的线程,以及几个使用数据的线程。我不需要对生成的数据进行排队,因为生成的数据比消耗的数据慢得多(即使偶尔出现这种情况,如果偶尔跳过数据点也不会成为问题)。所以,基本上,我有一个封装 "most recent state" 的对象,只允许生产者线程更新它。
我的策略如下(如果我完全失去理智请告诉我):
我为这个例子创建了三个 类:Thing
(实际状态对象),SharedObject<Thing>
(一个对象,可以是每个线程的本地对象,并给出thread access to the underlying Thing) 和 SharedObjectManager<Thing>
,它包含一个 shared_ptr
和一个 mutex
.
SharedObjectManager
(SOM) 的实例是一个全局变量。
当生产者启动时,它会实例化一个事物,并将其告知全局 SOM。然后它制作一个副本,并在该副本上完成所有更新工作。当它准备提交它对事物的更改时,它会将新事物传递给全局 SOM,后者锁定它的互斥锁,更新它保留的共享指针,然后释放锁。
同时,消费者线程全部实例化SharedObject<Thing>
。这些对象每个都保存一个指向全局 SOM 的指针,以及 SOM 保存的 shared_ptr
的缓存副本......它一直缓存到 update()
被显式调用。
我相信这越来越难以理解,所以这里有一些代码:
#include <mutex>
#include <iostream>
#include <memory>
class Thing
{
private:
int _some_member = 10;
public:
int some_member() const { return _some_member; }
void some_member(int val) {_some_member = val; }
};
// one global instance
template<typename T>
class SharedObjectManager
{
private:
std::shared_ptr<T> objPtr;
std::mutex objLock;
public:
std::shared_ptr<T> get_sptr()
{
std::lock_guard<std::mutex> lck(objLock);
return objPtr;
}
void commit_new_object(std::shared_ptr<T> new_object)
{
std::lock_guard<std::mutex> lck (objLock);
objPtr = new_object;
}
};
// one instance per consumer thread.
template<typename T>
class SharedObject
{
private:
SharedObjectManager<T> * som;
std::shared_ptr<T> cache;
public:
SharedObject(SharedObjectManager<T> * backend) : som(backend)
{update();}
void update()
{
cache = som->get_sptr();
}
T & operator *()
{
return *cache;
}
T * operator->()
{
return cache.get();
}
};
// no actual threads in this test, just a quick sanity check.
SharedObjectManager<Thing> glbSOM;
int main(void)
{
glbSOM.commit_new_object(std::make_shared<Thing>());
SharedObject<Thing> myobj(&glbSOM);
std::cout<<myobj->some_member()<<std::endl;
// prints "10".
}
生产者线程使用的思路是:
// initialization - on startup
auto firstStateObj = std::make_shared<Thing>();
glbSOM.commit_new_object(firstStateObj);
// main loop
while (1)
{
// invoke copy constructor to copy the current live Thing object
auto nextState = std::make_shared<Thing>(*(glbSOM.get_sptr()));
// do stuff to nextState, gradually filling out it's new value
// based on incoming data from other sources, etc.
...
// commit the changes to the shared memory location
glbSOM.commit_new_object(nextState);
}
消费者的使用将是:
SharedObject<Thing> thing(&glbSOM);
while(1)
{
// think about the data contained in thing, and act accordingly...
doStuffWith(thing->some_member());
// re-cache the thing
thing.update();
}
谢谢!
这是过度设计的方式。相反,我建议执行以下操作:
- 创建指向
Thing* theThing
的指针以及保护互斥锁。要么是全球性的,要么是通过其他方式共享的。将其初始化为 nullptr. - 在您的生产者中:使用两个
Thing
类型的本地对象 -Thing thingOne
和Thing thingTwo
(请记住,thingOne
并不比thingTwo
好,但有一个被称为thingOne
是有原因的,但这是一回事。小心猫。)。从填充thingOne
开始。完成后,锁定互斥量,将thingOne
地址复制到theThing
,解锁互斥量。开始填充thingTwo
。完成后,见上文。重复直到被杀死。 - 在每个侦听器中:(确保指针不是 nullptr)。锁定互斥量。复制
theThing
指向的两个对象。解锁互斥量。使用您的副本。阅后即焚。重复直到被杀死。