提升进程间:*非托管*共享内存中的字符串?

Boost interprocess: string in a *not managed* shared memory?

我知道在共享内存中构造字符串需要一个分配器。

很好,但我不知道该怎么做,因为所有示例都使用 Managed Shared Memory 方法 get_segment_manager()必须用作分配器(如果我没记错的话)。

让我们看看从这里复制的这个例子:https://www.boost.org/doc/libs/1_77_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.conditions.conditions_anonymous_example

doc_anonymous_condition_shared_data.hpp

#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>

struct trace_queue
{
   enum { LineSize = 100 };

   trace_queue()
      :  message_in(false)
   {}

   //Mutex to protect access to the queue
   boost::interprocess::interprocess_mutex      mutex;

   //Condition to wait when the queue is empty
   boost::interprocess::interprocess_condition  cond_empty;

   //Condition to wait when the queue is full
   boost::interprocess::interprocess_condition  cond_full;

   //Items to fill
   char   items[LineSize];

   //Is there any message
   bool message_in;
};

主要流程

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstdio>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{

   //Erase previous shared memory and schedule erasure on exit
   struct shm_remove
   {
      shm_remove() { shared_memory_object::remove("MySharedMemory"); }
      ~shm_remove(){ shared_memory_object::remove("MySharedMemory"); }
   } remover;

   //Create a shared memory object.
   shared_memory_object shm
      (create_only               //only create
      ,"MySharedMemory"           //name
      ,read_write                //read-write mode
      );
   try{
      //Set size
      shm.truncate(sizeof(trace_queue));

      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Construct the shared structure in memory
      trace_queue * data = new (addr) trace_queue;

      const int NumMsg = 100;

      for(int i = 0; i < NumMsg; ++i){
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(data->message_in){
            data->cond_full.wait(lock);
         }
         if(i == (NumMsg-1))
            std::sprintf(data->items, "%s", "last message");
         else
            std::sprintf(data->items, "%s_%d", "my_trace", i);

         //Notify to the other process that there is a message
         data->cond_empty.notify_one();

         //Mark message buffer as full
         data->message_in = true;
      }
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

第二个过程:

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>
#include <cstring>
#include "doc_anonymous_condition_shared_data.hpp"

using namespace boost::interprocess;

int main ()
{
   //Create a shared memory object.
   shared_memory_object shm
      (open_only                    //only create
      ,"MySharedMemory"              //name
      ,read_write                   //read-write mode
      );

   try{
      //Map the whole shared memory in this process
      mapped_region region
         (shm                       //What to map
         ,read_write //Map it as read-write
         );

      //Get the address of the mapped region
      void * addr       = region.get_address();

      //Obtain a pointer to the shared structure
      trace_queue * data = static_cast<trace_queue*>(addr);

      //Print messages until the other process marks the end
      bool end_loop = false;
      do{
         scoped_lock<interprocess_mutex> lock(data->mutex);
         if(!data->message_in){
            data->cond_empty.wait(lock);
         }
         if(std::strcmp(data->items, "last message") == 0){
            end_loop = true;
         }
         else{
            //Print the message
            std::cout << data->items << std::endl;
            //Notify the other process that the buffer is empty
            data->message_in = false;
            data->cond_full.notify_one();
         }
      }
      while(!end_loop);
   }
   catch(interprocess_exception &ex){
      std::cout << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

我想将 char items[LineSize]; 替换为 trace_queue 结构中更方便的 string

如果没有 Managed Shared Memory,我该如何做到这一点?

或者在没有托管 Boost 库的情况下完全不建议这样做?

Or this is somewhat completely not recommended to do without the managed Boost libraries?

我不推荐它。可以不受管理地执行它,但我 100% 会建议他们使用固定字符数组提供的确切方法。这有什么问题吗?

蛋糕不能吃。您不能同时神奇地希望“高级动态字符串”和“无堆管理开销”。

也就是说,您可能能够找到一些权衡。具体来说,您可能希望在这样的共享字节数组中模拟类似于多态内存资源的东西。然后你可以在上面使用 std::pmr::string 。悲剧是 memory_resource 不是共享内存安全的。

简化

但是,我想您所需要的只是一些很好的抽象,其中接口使用 C++ 词汇表类型。为什么不将整个交易简化到这一点?

这是一个速写草稿:

struct trace_queue {
  private:
    bip::interprocess_mutex     mutex;
    bip::interprocess_condition cond;

    std::array<char, 300> buffer{};
    bool message_in{false}; // Is there any message

    auto wait(bool state) {
        bip::scoped_lock lock(mutex);
        cond.wait(lock, [=,this] { return message_in == state; });
        return lock;
    }

  public:
    void send(std::string_view msg) {
        auto lock = wait(false); // !message_in

        auto n = std::min(buffer.size(), msg.size());
        std::fill(buffer.begin(), buffer.end(), '[=10=]');
        std::copy_n(msg.data(), n, buffer.begin());

        message_in = true;
        cond.notify_one();
    }

    std::string receive() {
        auto lock = wait(true); // message_in

        std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));

        message_in = false;
        cond.notify_one();
        return msg;
    }
};

在我看来代码已经更容易阅读了。而且它当然更容易使用!整个服务器端:

// Create a shared memory object.
bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
                              bip::read_write);
shm.truncate(sizeof(trace_queue));

// Map the whole shared memory in this process
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *new (region.get_address()) trace_queue;

for (int i = 0; i < 99; ++i)
    data.send("my_trace_" + std::to_string(i));

data.send("TEARDOWN");

客户端:

bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
                              bip::read_write);
bip::mapped_region region(shm, bip::read_write);
trace_queue& data = *static_cast<trace_queue*>(region.get_address());

while (true) {
    auto msg = data.receive();
    if (msg == "TEARDOWN")
        break;
    std::cout << msg << "\n";
};

看到了Live On Coliru

#include <array>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <iostream>

namespace bip = boost::interprocess;

struct trace_queue {
  private:
    bip::interprocess_mutex     mutex;
    bip::interprocess_condition cond;

    std::array<char, 300> buffer{};
    bool message_in{false}; // Is there any message

    auto wait(bool state) {
        bip::scoped_lock lock(mutex);
        cond.wait(lock, [=,this] { return message_in == state; });
        return lock;
    }

  public:
    void send(std::string_view msg) {
        auto lock = wait(false); // !message_in

        auto n = std::min(buffer.size(), msg.size());
        std::fill(buffer.begin(), buffer.end(), '[=13=]');
        std::copy_n(msg.data(), n, buffer.begin());

        message_in = true;
        cond.notify_one();
    }

    std::string receive() {
        auto lock = wait(true); // message_in

        std::string msg(buffer.data(), strnlen(buffer.data(), buffer.size()));

        message_in = false;
        cond.notify_one();
        return msg;
    }
};

int main(int argc, char**) {
    try {
        if (argc < 2) {
            // Erase previous shared memory and schedule erasure on exit
            struct shm_remove {
                shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
                ~shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
            } remover;

            // Create a shared memory object.
            bip::shared_memory_object shm(bip::create_only, "MySharedMemory",
                                          bip::read_write);
            shm.truncate(sizeof(trace_queue));

            // Map the whole shared memory in this process
            bip::mapped_region region(shm, bip::read_write);
            trace_queue& data = *new (region.get_address()) trace_queue;

            for (int i = 0; i < 99; ++i)
                data.send("my_trace_" + std::to_string(i));

            data.send("TEARDOWN");
        } else {
            bip::shared_memory_object shm(bip::open_only, "MySharedMemory",
                                          bip::read_write);
            bip::mapped_region region(shm, bip::read_write);
            trace_queue& data = *static_cast<trace_queue*>(region.get_address());

            while (true) {
                auto msg = data.receive();
                if (msg == "TEARDOWN")
                    break;
                std::cout << msg << "\n";
            };
        }
    } catch (std::exception const& ex) {
        std::cout << ex.what() << std::endl;
        return 1;
    }
}

输出,如预期: