提升进程间:*非托管*共享内存中的字符串?
Boost interprocess: string in a *not managed* shared memory?
我知道在共享内存中构造字符串需要一个分配器。
很好,但我不知道该怎么做,因为所有示例都使用 Managed Shared Memory 方法 get_segment_manager()
必须用作分配器(如果我没记错的话)。
doc_anonymous_condition_shared_data.hpp
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
struct trace_queue
{
enum { LineSize = 100 };
trace_queue()
: message_in(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Condition to wait when the queue is empty
boost::interprocess::interprocess_condition cond_empty;
//Condition to wait when the queue is full
boost::interprocess::interprocess_condition cond_full;
//Items to fill
char items[LineSize];
//Is there any message
bool message_in;
};
主要流程
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Erase previous shared memory and schedule erasure on exit
struct shm_remove
{
shm_remove() { shared_memory_object::remove("MySharedMemory"); }
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(create_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Set size
shm.truncate(sizeof(trace_queue));
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
trace_queue * data = new (addr) trace_queue;
const int NumMsg = 100;
for(int i = 0; i < NumMsg; ++i){
scoped_lock<interprocess_mutex> lock(data->mutex);
if(data->message_in){
data->cond_full.wait(lock);
}
if(i == (NumMsg-1))
std::sprintf(data->items, "%s", "last message");
else
std::sprintf(data->items, "%s_%d", "my_trace", i);
//Notify to the other process that there is a message
data->cond_empty.notify_one();
//Mark message buffer as full
data->message_in = true;
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
第二个过程:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Create a shared memory object.
shared_memory_object shm
(open_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Obtain a pointer to the shared structure
trace_queue * data = static_cast<trace_queue*>(addr);
//Print messages until the other process marks the end
bool end_loop = false;
do{
scoped_lock<interprocess_mutex> lock(data->mutex);
if(!data->message_in){
data->cond_empty.wait(lock);
}
if(std::strcmp(data->items, "last message") == 0){
end_loop = true;
}
else{
//Print the message
std::cout << data->items << std::endl;
//Notify the other process that the buffer is empty
data->message_in = false;
data->cond_full.notify_one();
}
}
while(!end_loop);
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
我想将 char items[LineSize];
替换为 trace_queue
结构中更方便的 string
。
如果没有 Managed Shared Memory,我该如何做到这一点?
或者在没有托管 Boost 库的情况下完全不建议这样做?
Or this is somewhat completely not recommended to do without the managed Boost libraries?
我不推荐它。可以不受管理地执行它,但我 100% 会建议他们使用固定字符数组提供的确切方法。这有什么问题吗?
蛋糕不能吃。您不能同时神奇地希望“高级动态字符串”和“无堆管理开销”。
也就是说,您可能能够找到一些权衡。具体来说,您可能希望在这样的共享字节数组中模拟类似于多态内存资源的东西。然后你可以在上面使用 std::pmr::string
。悲剧是 memory_resource 不是共享内存安全的。
简化
但是,我想您所需要的只是一些很好的抽象,其中接口使用 C++ 词汇表类型。为什么不将整个交易简化到这一点?
这是一个速写草稿:
struct trace_queue {
private:
bip::interprocess_mutex mutex;
bip::interprocess_condition cond;
std::array<char, 300> buffer{};
bool message_in{false}; // Is there any message
auto wait(bool state) {
bip::scoped_lock lock(mutex);
cond.wait(lock, [=,this] { return message_in == state; });
return lock;
}
public:
void send(std::string_view msg) {
auto lock = wait(false); // !message_in
auto n = std::min(buffer.size(), msg.size());
std::fill(buffer.begin(), buffer.end(), '[=10=]');
std::copy_n(msg.data(), n, buffer.begin());
message_in = true;
cond.notify_one();
}
std::string receive() {
auto lock = wait(true); // message_in
std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));
message_in = false;
cond.notify_one();
return msg;
}
};
在我看来代码已经更容易阅读了。而且它当然更容易使用!整个服务器端:
// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
bip::read_write);
shm.truncate(sizeof(trace_queue));
// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;
for (int i = 0; i < 99; ++i)
data.send("my_trace_" + std::to_string(i));
data.send("TEARDOWN");
客户端:
bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());
while (true) {
auto msg = data.receive();
if (msg == "TEARDOWN")
break;
std::cout << msg << "\n";
};
#include <array>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
namespace bip = boost::interprocess;
struct trace_queue {
private:
bip::interprocess_mutex mutex;
bip::interprocess_condition cond;
std::array<char, 300> buffer{};
bool message_in{false}; // Is there any message
auto wait(bool state) {
bip::scoped_lock lock(mutex);
cond.wait(lock, [=,this] { return message_in == state; });
return lock;
}
public:
void send(std::string_view msg) {
auto lock = wait(false); // !message_in
auto n = std::min(buffer.size(), msg.size());
std::fill(buffer.begin(), buffer.end(), '[=13=]');
std::copy_n(msg.data(), n, buffer.begin());
message_in = true;
cond.notify_one();
}
std::string receive() {
auto lock = wait(true); // message_in
std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));
message_in = false;
cond.notify_one();
return msg;
}
};
int main(int argc, char**) {
try {
if (argc < 2) {
// Erase previous shared memory and schedule erasure on exit
struct shm_remove {
shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
~shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
} remover;
// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
bip::read_write);
shm.truncate(sizeof(trace_queue));
// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;
for (int i = 0; i < 99; ++i)
data.send("my_trace_" + std::to_string(i));
data.send("TEARDOWN");
} else {
bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());
while (true) {
auto msg = data.receive();
if (msg == "TEARDOWN")
break;
std::cout << msg << "\n";
};
}
} catch (std::exception const& ex) {
std::cout << ex.what() << std::endl;
return 1;
}
}
输出,如预期:
我知道在共享内存中构造字符串需要一个分配器。
很好,但我不知道该怎么做,因为所有示例都使用 Managed Shared Memory 方法 get_segment_manager()
必须用作分配器(如果我没记错的话)。
doc_anonymous_condition_shared_data.hpp
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
struct trace_queue
{
enum { LineSize = 100 };
trace_queue()
: message_in(false)
{}
//Mutex to protect access to the queue
boost::interprocess::interprocess_mutex mutex;
//Condition to wait when the queue is empty
boost::interprocess::interprocess_condition cond_empty;
//Condition to wait when the queue is full
boost::interprocess::interprocess_condition cond_full;
//Items to fill
char items[LineSize];
//Is there any message
bool message_in;
};
主要流程
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Erase previous shared memory and schedule erasure on exit
struct shm_remove
{
shm_remove() { shared_memory_object::remove("MySharedMemory"); }
~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
} remover;
//Create a shared memory object.
shared_memory_object shm
(create_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Set size
shm.truncate(sizeof(trace_queue));
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Construct the shared structure in memory
trace_queue * data = new (addr) trace_queue;
const int NumMsg = 100;
for(int i = 0; i < NumMsg; ++i){
scoped_lock<interprocess_mutex> lock(data->mutex);
if(data->message_in){
data->cond_full.wait(lock);
}
if(i == (NumMsg-1))
std::sprintf(data->items, "%s", "last message");
else
std::sprintf(data->items, "%s_%d", "my_trace", i);
//Notify to the other process that there is a message
data->cond_empty.notify_one();
//Mark message buffer as full
data->message_in = true;
}
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
第二个过程:
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"
using namespace boost::interprocess;
int main ()
{
//Create a shared memory object.
shared_memory_object shm
(open_only //only create
,"MySharedMemory" //name
,read_write //read-write mode
);
try{
//Map the whole shared memory in this process
mapped_region region
(shm //What to map
,read_write //Map it as read-write
);
//Get the address of the mapped region
void * addr = region.get_address();
//Obtain a pointer to the shared structure
trace_queue * data = static_cast<trace_queue*>(addr);
//Print messages until the other process marks the end
bool end_loop = false;
do{
scoped_lock<interprocess_mutex> lock(data->mutex);
if(!data->message_in){
data->cond_empty.wait(lock);
}
if(std::strcmp(data->items, "last message") == 0){
end_loop = true;
}
else{
//Print the message
std::cout << data->items << std::endl;
//Notify the other process that the buffer is empty
data->message_in = false;
data->cond_full.notify_one();
}
}
while(!end_loop);
}
catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}
我想将 char items[LineSize];
替换为 trace_queue
结构中更方便的 string
。
如果没有 Managed Shared Memory,我该如何做到这一点?
或者在没有托管 Boost 库的情况下完全不建议这样做?
Or this is somewhat completely not recommended to do without the managed Boost libraries?
我不推荐它。可以不受管理地执行它,但我 100% 会建议他们使用固定字符数组提供的确切方法。这有什么问题吗?
蛋糕不能吃。您不能同时神奇地希望“高级动态字符串”和“无堆管理开销”。
也就是说,您可能能够找到一些权衡。具体来说,您可能希望在这样的共享字节数组中模拟类似于多态内存资源的东西。然后你可以在上面使用 std::pmr::string
。悲剧是 memory_resource 不是共享内存安全的。
简化
但是,我想您所需要的只是一些很好的抽象,其中接口使用 C++ 词汇表类型。为什么不将整个交易简化到这一点?
这是一个速写草稿:
struct trace_queue {
private:
bip::interprocess_mutex mutex;
bip::interprocess_condition cond;
std::array<char, 300> buffer{};
bool message_in{false}; // Is there any message
auto wait(bool state) {
bip::scoped_lock lock(mutex);
cond.wait(lock, [=,this] { return message_in == state; });
return lock;
}
public:
void send(std::string_view msg) {
auto lock = wait(false); // !message_in
auto n = std::min(buffer.size(), msg.size());
std::fill(buffer.begin(), buffer.end(), '[=10=]');
std::copy_n(msg.data(), n, buffer.begin());
message_in = true;
cond.notify_one();
}
std::string receive() {
auto lock = wait(true); // message_in
std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));
message_in = false;
cond.notify_one();
return msg;
}
};
在我看来代码已经更容易阅读了。而且它当然更容易使用!整个服务器端:
// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
bip::read_write);
shm.truncate(sizeof(trace_queue));
// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;
for (int i = 0; i < 99; ++i)
data.send("my_trace_" + std::to_string(i));
data.send("TEARDOWN");
客户端:
bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());
while (true) {
auto msg = data.receive();
if (msg == "TEARDOWN")
break;
std::cout << msg << "\n";
};
#include <array>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
namespace bip = boost::interprocess;
struct trace_queue {
private:
bip::interprocess_mutex mutex;
bip::interprocess_condition cond;
std::array<char, 300> buffer{};
bool message_in{false}; // Is there any message
auto wait(bool state) {
bip::scoped_lock lock(mutex);
cond.wait(lock, [=,this] { return message_in == state; });
return lock;
}
public:
void send(std::string_view msg) {
auto lock = wait(false); // !message_in
auto n = std::min(buffer.size(), msg.size());
std::fill(buffer.begin(), buffer.end(), '[=13=]');
std::copy_n(msg.data(), n, buffer.begin());
message_in = true;
cond.notify_one();
}
std::string receive() {
auto lock = wait(true); // message_in
std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));
message_in = false;
cond.notify_one();
return msg;
}
};
int main(int argc, char**) {
try {
if (argc < 2) {
// Erase previous shared memory and schedule erasure on exit
struct shm_remove {
shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
~shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
} remover;
// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
bip::read_write);
shm.truncate(sizeof(trace_queue));
// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;
for (int i = 0; i < 99; ++i)
data.send("my_trace_" + std::to_string(i));
data.send("TEARDOWN");
} else {
bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());
while (true) {
auto msg = data.receive();
if (msg == "TEARDOWN")
break;
std::cout << msg << "\n";
};
}
} catch (std::exception const& ex) {
std::cout << ex.what() << std::endl;
return 1;
}
}
输出,如预期: