Halo
发布于 2022-05-10 / 102 阅读 / 0 评论 / 0 点赞

zmq

简介

ZMQ(ZERO MQ) 是个类似于 Socket 的一系列接口,用于 node 与 node 间的通信,node 可以是主机或者是进程。
ZMQ 使用 c/c++ 开发的。接口是 c (接口/实现文件 zmq.h/zmq.cpp)。
官网: https://zeromq.org/

与 Socket 的区别

  • 一个套接字可以有多个输入和输出连接。普通的 Socket 是端到端的(1:1的关系),而 ZMQ 却是可以N:M 的关系.
  • Socket 是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而 ZMQ 屏蔽了这些细节,让你的网络编程更为简单。
  • ZMQ提供了多种模式进行消息路由,如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构。
  • 使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;
  • 连接是异步的,并由一组消息队列做缓冲,后台线程异步地处理I/O操作。
  • ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;
  • ZMQ会发送整个消息,使用消息帧的机制来传递。如果你发送了10KB大小的消息,你就会收到10KB大小的消息。不强制使用某种消息格式,消息可以是0字节的,或是大到GB级的数据
  • ZMQ会负责自动重连,能够智能地处理网络错误,有时它会进行重试,有时会告知你某项操作发生了错误。服务端可以随意地加入或退出网络。

生命周期

主要包含四个部分:

  • 创建和销毁套接字:zmq_socket(), zmq_close()
  • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  • 为套接字建立连接:zmq_bind(), zmq_connect()
  • 发送和接收消息:zmq_send(), zmq_recv()

消息异常处理

  • ZMQ有阈值(HWM)的机制,可以避免消息溢出。当队列已满,ZMQ会自动阻塞发送者,或丢弃部分消息,这些行为取决于你所使用的消息模式。
  • ZMQ会恰当地处理速度较慢的节点,会根据消息模式使用不同的策略。
  • ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度。

Request-Reply 模式

客户端

  1. 首先初始化环境, 创建 REQ 套接字;
  2. 然后在一个循环中发送消息 —> 接收消息. 如果打乱了这个顺序(如连续发送两次)则会报错
  3. 退出时关闭 REQ 套接字和释放环境(类的实例会自动释放)
#include <string>
#include <iostream>

#include <zmq.hpp>

int main()
{
    // initialize the zmq context with a single IO thread
    zmq::context_t context{1};

    // construct a REQ (request) socket and connect to interface
    zmq::socket_t socket{context, zmq::socket_type::req};
    socket.connect("tcp://localhost:5555");

    // set up some static data to send
    const std::string data{"Hello"};

    for (auto request_num = 0; request_num < 10; ++request_num) 
    {
        // send the request message
        std::cout << "Sending Hello " << request_num << "..." << std::endl;
        socket.send(zmq::buffer(data), zmq::send_flags::none);
        
        // wait for reply from server
        zmq::message_t reply{};
        socket.recv(reply, zmq::recv_flags::none);

        std::cout << "Received " << reply.to_string(); 
        std::cout << " (" << request_num << ")";
        std::cout << std::endl;
    }

    return 0;
}

服务端

  1. 首先初始化环境,创建 REP 套接字,并绑到端口;
  2. 然后在一个循环中 zmq_recv 接收接受消息 —> zmq_send 发送. 如果打乱了这个顺序(如接受和发送顺序反了)则会报错
  3. 对于没有 client 连接的消息, 退出时不用做任何处理(类的实例会自动释放)
#include <string>
#include <chrono>
#include <thread>
#include <iostream>

#include <zmq.hpp>

int main() 
{
    using namespace std::chrono_literals;

    // initialize the zmq context with a single IO thread
    zmq::context_t context{1};

    // construct a REP (reply) socket and bind to interface
    zmq::socket_t socket{context, zmq::socket_type::rep};
    socket.bind("tcp://*:5555");

    // prepare some static data for responses
    const std::string data{"World"};

    for (;;) 
    {
        zmq::message_t request;

        // receive a request from client
        socket.recv(request, zmq::recv_flags::none);
        std::cout << "Received " << request.to_string() << std::endl;

        // simulate work
        std::this_thread::sleep_for(1s);

        // send the reply to the client
        socket.send(zmq::buffer(data), zmq::send_flags::none);
    }

    return 0;
}

Publish-Subscribe 模式

客户端

  1. 首先初始化环境, 创建 ZMQ_SUB 套接字;
  2. 然后设置订阅消息,如果不 zmq_setsockopt 设置订阅内容,那将什么消息都收不到;
  3. 最后在一个循环体中使用 s_recv() 接收消息.
  4. 退出时关闭 ZMQ_SUB 套接字和释放环境(类的实例会自动释放)

.

/**
* Example of ZeroMQ pub/sub usage for C++11.
*/

#include <zmqpp/zmqpp.hpp>
#include <iostream>
#include <chrono>

using namespace std;

static const string PUBLISHER_ENDPOINT = "tcp://localhost:4242";

int main(int argc, char *argv[]) {

  // Create a subscriber socket
  zmqpp::context context;
  zmqpp::socket_type type = zmqpp::socket_type::subscribe;
  zmqpp::socket socket(context, type);

  // Subscribe to the default channel
  socket.subscribe("");

  // Connect to the publisher
  cout << "Connecting to " << PUBLISHER_ENDPOINT << "..." << endl;
  socket.connect(PUBLISHER_ENDPOINT);

  while(true) {

    // Receive (blocking call)
    zmqpp::message message;
    socket.receive(message);

    // Read as a string
    string text;
    message >> text;

    unsigned long ms = std::chrono::system_clock::now().time_since_epoch() /
            std::chrono::milliseconds(1);

    cout << "[RECV] at " << ms << ": \"" << text << "\"" << endl;
  }

  // Unreachable, but for good measure
  socket.disconnect(PUBLISHER_ENDPOINT);
  return 0;
}

服务端

  1. 首先初始化环境, 创建 ZMQ_PUB 套接字, 并绑定端口;
  2. 然后在一个循环中发布消息.对于没有 client 订阅的消息,则直接被抛弃
  3. 退出时关闭 ZMQ_PUB 套接字和释放环境(类的实例会自动释放)
    关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。
    就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零.
/**
* Example of ZeroMQ pub/sub usage for C++11.
*/

#include <zmqpp/zmqpp.hpp>
#include <iostream>
#include <chrono>
#include <thread>

using namespace std;

static const string PUBLISH_ENDPOINT = "tcp://*:4242";

int main(int argc, char *argv[]) {

  // Create a publisher socket
  zmqpp::context context;
  zmqpp::socket_type type = zmqpp::socket_type::publish;
  zmqpp::socket socket (context, type);

  // Open the connection
  cout << "Binding to " << PUBLISH_ENDPOINT << "..." << endl;
  socket.bind(PUBLISH_ENDPOINT);

  // Pause to connect
  this_thread::sleep_for(chrono::milliseconds(1000));

  while(true) {

    // Current time in ms
    unsigned long ms = chrono::system_clock::now().time_since_epoch() /
        chrono::milliseconds(1);

    string text = "Hello at " + to_string(ms);

    // Create a message and feed data into it
    zmqpp::message message;
    message << text;

    // Send it off to any subscribers
    socket.send(message);
    cout << "[SENT] at " << ms << ": " << text << endl;

    this_thread::sleep_for(chrono::microseconds(1000));
  }

  // Unreachable, but for good measure
  socket.disconnect(PUBLISH_ENDPOINT);
  return 0;
}

push/pull 模式

  • ZMQ_PUSH 被多个 ZMQ_PULL 连接时,多个 ZMQ_PULL 之间采用公平队列的方式接收数据
  • 用的最多的地方就是分而治之,把多个任务平均分配到多个 ZMQ_PULL 上执行,执行完成后再由一个 ZMQ_PULL 收集执行的结果
  • ZMQ_PUSH 的消息会一直阻塞,不会被丢弃

push

  1. 首先初始化环境; 创建 ZMQ_PUSH 套接字 pusher, 并绑定到端口
  2. pusher 发送具体任务.
  3. 退出时使用 zmq_close 关闭 push_socket 和释放环境(类的实例会自动释放)
#include <zmq.hpp>

int main()
{
    zmq::context_t ctx;
    zmq::socket_t pusher(ctx, zmq::socket_type::push);
    pusher.bind("inproc://test");
    pusher.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}

pull

  1. 首先初始化环境; 创建 ZMQ_PULL 套接字 puller,并连接到 pusher;
  2. 接受来自 ventilator 中 sender 的任务
  3. 退出时关闭 receiver/sender 两个套接字和释放环境(类的实例会自动释放)
#include <iostream>
#include <zmq_addon.hpp>

int main()
{
    zmq::context_t ctx;
    zmq::socket_t puller(ctx, zmq::socket_type::pull);
    puller.connect(last_endpoint);

    std::vector<zmq::message_t> recv_msgs;
    const auto ret = zmq::recv_multipart(
        puller, std::back_inserter(recv_msgs));
    if (!ret)
        return 1;
    std::cout << "Got " << *ret
              << " messages" << std::endl;
    return 0;
}

评论