C++(11) "High Performance" 并发单个编写器

C++(11) "High Performance" Concurrent Single Writer(s)

我有一个对象 B,由 10 个双精度值组成。 双精度值由一个传感器产生,该传感器进行通信 通过以太网,每 1-2 毫秒发送一次更新。 B 的实例必须在程序中的其他 2 个地方使用 做一些计算和可视化。

在另一个线程中,一个对象 A 使用另一个传感器每 4-10 毫秒更新一次,其中包含 ~1000 个双精度值。

两个对象都有一个时间签名(当传感器更新到达时使用 boost::chrono::high_resolution_clock 获得)

现在我想使用两个对象 AB,这两个对象已经更新了将近 同时计算一个对象的一些值C。 这应该在两个线程都是 运行 和输出时完成 用于做一些可视化,计算平均值等。 整个过程是 运行 1-2 小时和 之后仅使用计算出的 Cs 的实例, 不再需要 AB 的实例。

在线程之间实现这种通信和数据共享的推荐方法(或设计模式)是什么?

目前,整个结构实施得很糟糕,而且 构成 A 的线程直接与构成 B 的线程通信 不使用像互斥锁这样的同步方法。

我假设您在 PC 和 OS 上进行所有计算,但没有 real-time 保证。因此,只要您处于毫秒范围内,您就不会受到严格的 real-time 限制。因此,使用无锁数据结构并不是绝对必要的。但是你可以。他们也有很好的表现。如果你愿意,你可以为此使用 boosts lockfree 数据结构。但是你可能想要一个 pop() 函数阻塞直到队列不为空,否则你需要忙等待或者使用某种信号量或条件变量来等待队列的状态不是空的。

您的争用很少,只有几个线程。在低争用情况下锁定互斥锁通常需要 25ns 的数量级。所以这不是问题。现在内存分配通常也非常快,通常平均不到 100ns。所以这是我的看法:

  • 使用 std::queue<std::packaged_task<void()>> 作为任务队列。如果计算结果不需要 std::future,您也可以决定入队 std::function<void()> 而不是 std::packaged_task<void()>()。使用 std::mutex 保护对其的访问。有一个 std::condition_variable 在那里等待队列 non-empty。我曾经实现过 a generic blocking concurrent queue on github ,它比那更有效,但只使用标准库方法。您只需将模板参数指定为 T=std::packaged_task<void()> 即可获得所需的数据结构。

  • 创建一个线程std::thread,它只执行传入的任务。要退出线程,请在线程的最后一个任务中设置一些 std::atomic<bool> 标志,这样线程就会停止执行,您可以 join() 优雅地执行它。同样,您可以将该功能放入 class。 class 的析构函数应该告诉任务执行线程停止并执行 join()。我实现了这样一个class in the same repository on github。你可以自由使用它。

  • 现在您可以将任务从从硬件接收数据的两个线程传递给任务执行器。只需将适当的 lambda 传递给执行需要完成的工作的任务执行器。如果您决定使用我的实现,您可以编写类似

    的内容
    void receiveData( const Data & data, cu::ParallelExecutor & executor )
    {
        executor.addTask( [=]{ /* Do something with `data` */ } );
    }
    

这种方式一切都是thread-safe,具有低开销和高性能。