我需要做什么才能使 ZMQ_RADIO / ZMQ_DISH 正常工作?

What do I need to do to get ZMQ_RADIO / ZMQ_DISH to work properly?

我正在尝试使用 ZMQ 草案规范 ZMQ_RADIOZMQ_DISH。我使用 CMake ExternalProject 和标志 ENABLE_DRAFTS=ON 构建了 libzmq 和 cppzmq,并验证它是使用 zmq_has() 函数通过草稿构建的。我修改了标准 hello world example 以使用收音机和天线,但无法让他们说话。我还收到 ZMQ_RADIOZMQ_DISH 未定义的编译错误。我手动定义了它们并进行了编译,但我从未获得实际连接,所以似乎还有其他问题。

这是我的代码:

CMakeLists.txt

cmake_minimum_required(VERSION 2.8.11)
project(zmq_udp)

include(ExternalProject)

ExternalProject_Add(libzmq
    GIT_REPOSITORY https://github.com/zeromq/libzmq
    GIT_TAG master
    CMAKE_ARGS 
      -DENABLE_DRAFTS=ON
      -DWITH_PERF_TOOL=OFF 
      -DZMQ_BUILD_TESTS=OFF 
      -DENABLE_CPACK=OFF
      -DCMAKE_INSTALL_PREFIX=${CMAKE_BINARY_DIR}/zmq
      -DCMAKE_LIBRARY_OUTPUT_DIRECTORY=${CMAKE_BINARY_DIR}/zmq/lib
      -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
      -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
      -DCMAKE_SHARED_LINKER_FLAGS=${CMAKE_SHARED_LINKER_FLAGS}
)

ExternalProject_Add(cppzmq
    GIT_REPOSITORY https://github.com/zeromq/cppzmq
    GIT_TAG master
    CONFIGURE_COMMAND ""
    BUILD_COMMAND ""
    INSTALL_COMMAND ${CMAKE_COMMAND} -E copy <SOURCE_DIR>/zmq.hpp ${CMAKE_BINARY_DIR}/zmq/include/zmq.hpp
    TEST_COMMAND ""
)

add_dependencies(cppzmq libzmq)

set(ZEROMQ_LIBNAME "libzmq.so")
set(ZEROMQ_INCLUDE_DIRS ${CMAKE_BINARY_DIR}/zmq/include)
set(ZEROMQ_LIBRARIES ${CMAKE_BINARY_DIR}/zmq/lib/${ZEROMQ_LIBNAME})

include_directories(${ZEROMQ_INCLUDE_DIRS})

add_executable(server server.cpp)
add_executable(client client.cpp)
add_dependencies(server cppzmq)
add_dependencies(client cppzmq)
target_link_libraries(server ${ZEROMQ_LIBRARIES})
target_link_libraries(client ${ZEROMQ_LIBRARIES})

server.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>

#define ZMQ_DISH 15

int main ()
{
    std::cout << zmq_has("draft") << std::endl;

    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_DISH);
    socket.bind ("udp://127.0.0.1:5555");

    while (true)
    {
        zmq::message_t request;

        socket.recv (&request);
        std::cout << "Received Hello" << std::endl;
    }

    return 0;
}

client.cpp

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>

#define ZMQ_RADIO 14

int main ()
{
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_RADIO);

    std::cout << "Connecting to hello world server…" << std::endl;
    socket.connect ("udp://127.0.0.1:5555");

    for (int request_nbr = 0; request_nbr != 10; request_nbr++)
    {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << request_nbr << "…" << std::endl;
        socket.send (request);

        sleep(1);
    }

    return 0;
}

服务器按预期为 zmq_has() 函数输出 1,这应验证 libzmq 是在草稿 API 模式开启的情况下构建的。

我需要做什么才能让 RADIO/DISH 正常工作?

我想在项目中使用 ZMQ 作为 UDP 接收器来接收来自非 ZMQ 应用程序的一些 UDP 数据包。

休斯顿,我们有问题:

我不熟悉条件构建以及在最近的 ZeroMQ 版本中包括 draft-API(s)。如果它确实打算以您假定的方式工作,那么 #define-s 应该已经在那里解决了,不是吗?

也许您已经从一些 GitHub 来源中挖掘出 ZMQ_RADIO + ZMQ_DISH 的正确 #define 序数,与核心功能兼容,但一般方法只是手动:

#define                        A_NOT_IMPLEMENTED_CABLE_TV_BROADCAST_ARCHETYPE -1234
void   *dsh = zmq_socket( ctx, A_NOT_IMPLEMENTED_CABLE_TV_BROADCAST_ARCHETYPE );
assert( dsh              && "INF: a socket instantiation from [ctx] failed." );

        rc = bind( dsh, "udp://*:5555" );
assert( rc == 0          && "INF: a socket .bind( 'udp://*.5555' ) failed.");

听起来很可疑,即使有旗帜的承诺 ENABLE_DRAFTS=ON,不是吗?


总结

如果您的项目旨在使用 RADIO/DISH,请仔细查看已发布的 API(关于未实现/未发布功能的警告),您还可以在其中找到其他强制性步骤:

Radio-dish is using groups (vs Pub-sub topics), Dish sockets can join a group and each message sent by Radio sockets belong to a group.

Groups are null terminated strings limited to 16 chars length (including null). The intention is to increase the length to 40 chars (including null).

Groups are matched using exact matching (vs prefix matching of PubSub).

ZMQ_RADIO方必须使用zmq_msg_set_group(3)先将消息分配到一个组。

ZMQ_DISH 方必须调用 zmq_join(3) 以便 "enter"组以便接收任何消息,因为默认情况下,很明显,在它实例化时没有成员资格。

ZMQ_DISH 方可以调用 zmq_msg_group(3) 来获取消息实际所属的组.


ZeroMQ 在 W.I.P。 - 所以可能想检查 是否有类似的服务。

如果急需,Martin Sustrik 启动了另一个智能 messaging/signalling 工具 -

经过一些麻烦后, 似乎推出了生产版本,其中可扩展的正式通信模式可能会帮助您实现项目目标。 值得一试。

RADIO 和 DISH 处于草稿状态,在稳定版本中不可用。如果您需要访问 DRAFT API,请从此 link

构建 zmq

以下为部分zmq.hpp

// These functions are DRAFT and disabled in stable releases, and subject to 
// change at ANY time until declared stable.                                 
    #ifdef ZMQ_BUILD_DRAFT_API

    //DRAFT Socket types.                                                       
#define ZMQ_SERVER 12
#define ZMQ_CLIENT 13
#define ZMQ_RADIO 14
#define ZMQ_DISH 15
#define ZMQ_GATHER 16
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
#endif

这是使用这两个的示例。我已经测试过了。

最重要的是你需要检查你没有加载你的系统 zmq 库。 就我而言,我在 cmake 中做了这样的事情:

set(ENABLE_DRAFTS ON)

add_subdirectory(libzmq)
set_target_properties(libzmq PROPERTIES PREFIX "dev-")

# If you want target
add_library(CppZeroMQ INTERFACE)

target_link_libraries(CppZeroMQ INTERFACE $<$<CONFIG:Debug>:libzmq>$<$<CONFIG:Release>:libzmq-static>)
# For CPP headers (you may install and change path here)
target_include_directories(CppZeroMQ INTERFACE my/path/to/cppzmq)
target_compile_definitions(CppZeroMQ INTERFACE ZMQ_BUILD_DRAFT_API=1)

多亏了它,它会从源代码中区分系统库和你的。可能系统库没有使用 DRAFTS。


Server/Publisher部分

    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_RADIO);
    // We need set IP of destination, sad but true
    publisher.connect("udp://127.0.0.1:30666");

    std::string text;
    text.reserve(128);

    int number = 0;
    while(publisher.connected())
    {
        std::chrono::microseconds timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::system_clock::now().time_since_epoch());

        text.clear();
        text += std::to_string(timestamp.count());
        text += ";";
        text += std::to_string(++number);

        zmq::message_t update{text.data(), text.size()};
        update.set_group("test");
        std::cout << "Sending: " << timestamp << " number:" << number << std::endl;
        publisher.send(update);
        std::this_thread::sleep_for(1s);
    }

Client/Subscriber部分

    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_DISH);
    subscriber.bind("udp://*:30666");
    subscriber.join("test");

    int previousNumber = 0;
    int lostCount = -1;

    while(subscriber.connected())
    {
        zmq::message_t update;

        subscriber.recv(&update);

        std::string_view text(update.data<const char>(), update.size());
        std::cout << text;

        auto splitPoint = text.find(';');
        std::string serverTime = std::string{text.substr(0, splitPoint)};
        std::string serverNumber = std::string{text.substr(splitPoint + 1)};
        auto number = std::stoi(serverNumber);
        if(number != previousNumber + 1)
        {
            ++lostCount;
        }
        previousNumber = number;

        const auto diff =
            system_clock::now() -
            system_clock::time_point{std::chrono::microseconds{std::stoull(serverTime)}};

        // Beautify at: https://github.com/gelldur/common-cpp/blob/master/src/acme/beautify.h
        std::cout << " ping:" << Beautify::nice{diff} << "UDP lost: " << lostCount << std::endl;
    }