Zeromq 与 C++ 绑定和 Openmp 阻塞问题。为什么?

Zero MQ with C++ bindings and Open MP blocking issue. Why?

wrote a test 让 ZeroMQ 说服自己,它设法独立于处理顺序将回复映射到客户端,这将证明它是线程安全的。

它是一个多线程服务器,它只是将接收到的消息抛回给发送者。客户端从多个线程发送一些消息并检查是否收到相同的消息。对于多线程,我使用 OpenMP。

该测试运行良好, 我想继续并使用 C++ bindings 为 ZeroMQ 重新实现它。 现在 它不再以同样的方式工作

这是 ZMQPP 的代码:

#include <gtest/gtest.h>
#include <zmqpp/zmqpp.hpp>
#include <zmqpp/proxy.hpp>

TEST(zmqomp, order) {
    zmqpp::context ctx;

    std::thread proxy([&ctx] {
        zmqpp::socket dealer(ctx, zmqpp::socket_type::xrequest);
        zmqpp::socket router(ctx, zmqpp::socket_type::xreply);
        router.bind("tcp://*:1234");
        dealer.bind("inproc://workers");
        zmqpp::proxy(router, dealer);
    });

    std::thread worker_starter([&ctx] {
#pragma omp parallel
        {
            zmqpp::socket in(ctx, zmqpp::socket_type::reply);
            in.connect("inproc://workers");
#pragma omp for
            for (int i = 0; i < 1000; i++) {
                std::string request;
                in.receive(request);
                in.send(request);
            }
        }
    });

    std::thread client([&ctx] {
#pragma omp parallel
        {
            zmqpp::socket out(ctx, zmqpp::socket_type::request);
            out.connect("tcp://localhost:1234");
#pragma omp for
            for (int i = 0; i < 1000; i++) {
                std::string msg("Request " + std::to_string(i));
                out.send(msg);    
                std::string reply;
                out.receive(reply);

                EXPECT_EQ(reply, msg);
            }
        }
    });

    client.join();
    worker_starter.join();
    ctx.terminate();
    proxy.join();
}

测试块并没有执行到最后。我玩了一下 #pragmas,发现只有一个变化可以 "fix" 它:

//#pragma omp parallel for
            for (int i = 0; i < 250; i++) {

在那种情况下,代码仍然是并行执行的,但我必须将循环执行次数除以我的物理核心数。

有人知道这里发生了什么吗?

序言:ZeroMQ 从定义和设计上来说都不是线程安全的。

这通常无关紧要,因为有一些安全防护设计实践,但这里的情况会更糟,一旦遵循建议的 TEST(){...} 设计。

在 ZeroMQ 上花了一些时间,你的提议由于违反了几个主要的事情而被否决了,否则这有助于分布式架构比纯 SEQ 更聪明地工作整体代码。

ZeroMQ 在(几乎)每三个段落中说服 避免 资源共享。 零共享 是 ZeroMQ 出色的可扩展性能和最小化延迟准则之一,简而言之。

所以最好完全避免共享 zmq.Context() 实例(除非你非常清楚这些东西在幕后工作的原因和方式)。

因此尝试并行触发 1000 次(几乎)(嗯,不是真正的 PAR)一些事件流到 [的共享实例上=14=] (一旦使用默认参数实例化并具有 none 性能调整适应性,就会越少)肯定会因为性能和设计方面的建议而背道而驰。


有哪些限制,不能乱撞?

1) 每个 zmq.Context() 实例都有数量有限的 I/O-threads,它们是在实例化过程中创建的。一旦一个公平的设计需要一些性能调整,就可以增加 I/O-threads 的数量,数据泵将工作得更好(当然,none 数量的数据泵将挽救一个穷人,减去分布式计算系统的灾难性设计/架构。这是理所当然的。)。

2) 每个 zmq.Socket() 实例都有一个 { implicit |显式}映射到相应的I/O-thread。一旦公平的设计需要增强鲁棒性以应对缓慢的事件循环处理或数据流风暴(或负载平衡或您命名的)引起的其他不利影响,就有机会从分而治之的方法中受益使用 .setsockopt( zmq.AFFINITY, ... ) 方法将每个 zmq.Socket() 实例直接映射到相应的 I/O-thread,从而保持对缓冲和内部队列争夺的控制实际操作期间的哪些资源。在任何情况下,如果线程总数超过本地主机的内核数,恰好并发调度是显而易见的(所以一个真正的梦想 PAR 执行主要是无意中丢失。这是允许的。).

3) 每个 zmq.Socket() 还有一对 "Hidden Queue Devastators",称为高水位线。这些设置为 { 隐式 |显式},后者肯定是一种更明智的性能调整方式。为什么是毁灭者?因为这些稳定并保护分布式计算系统免受溢出,并且允许简单地丢弃高于 HWM 级别的每条消息,以保护系统的能力运行 永远,即使在暴风雨、损坏数据包的虚假爆炸或 DDoS 类型的攻击下。有很多工具可以调整 ZeroMQ Context()-实例行为的这个域,这超出了这个答案的范围(参考:我关于 ZeroMQ AFFINITY 好处或所用 ZeroMQ API 规范的其他帖子在 .setsockopt() 方法中)。

4) 每个基于 tcp:// transport-classzmq.Socket() 实例也继承了一些 O/S 从属遗产。一些 O/S 通过 ip 数据包的扩展累积 (在任何 ZeroMQ 控制之外) 证明这种风险,直到超过某个阈值,因此应该对这种情况采取适当的设计注意以避免对预期的应用程序信号/消息传递动态和针对此类不可控(外系统)缓冲习惯的鲁棒性产生不利影响。

5) 每个 .recv().send() 方法调用都是 阻塞,这是一个大规模分布式计算系统永远不应该冒险进入的事情。永远不能。即使在教科书的例子中。而是使用这些调用的非阻塞形式。总是。 这是理所当然的。

6) 每个 zmq.Socket() 实例都应该进行一系列谨慎而优雅的终止步骤。 .setsockopt( zmq.LINGER, 0 ) 的预防措施 + 明确的 .close() 方法被要求包含在每次使用中是公平的-案例(并且无论可能出现的任何异常如何都可以执行。)。一个可怜的{自我|这种做法中的团队- }-纪律 肯定会因为没有对强制性资源管理策略给予应有的关注而挂断整个应用程序基础架构。这是任何严肃的分布式计算项目 的必备部分。即使是教科书的例子也应该有这个。没有例外。别找借口。 这是理所当然的。