[ZMQ] ZMQ_PUB和ZMQ_SUB 关于缓存区的问题

ZMQ_PUB和ZMQ_SUB 关于缓存区的问题

问题描述

环境

  1. ubuntu18.04
  2. zmq version:4.3.3

设置了一个发送端和一个接收端,发送端是 ZMQ_PUB,接收端 ZMQ_SUB,并且通过 zmq_setsockopt设置了 ZMQ_SNDHWM, ZMQ_RCVHWM, ZMQ_SNDBUF, ZMQ_RCVBUF

设置了代码内的缓存区大小

设置完成后,通过实际代码测试,并没有起到作用,接收端依旧会收到很久的消息,具体可以跑下面的代码来查看。
接收端一直持续接收到3757帧之后刷新。

[ZMQ] ZMQ_PUB和ZMQ_SUB 关于缓存区的问题

设置了内核套接字缓存区

$ sudo sysctl -a | grep mem
net.core.optmem_max = 20480
net.core.rmem_default = 212992
net.core.rmem_max = 212992
net.core.wmem_default = 212992
net.core.wmem_max = 212992
net.ipv4.fib_sync_mem = 524288
net.ipv4.igmp_max_memberships = 20
net.ipv4.tcp_mem = 475113   633486  950226
net.ipv4.tcp_rmem = 4096    131072  6291456
net.ipv4.tcp_wmem = 4096    16384   4194304
net.ipv4.udp_mem = 950229   1266972 1900458
net.ipv4.udp_rmem_min = 4096
net.ipv4.udp_wmem_min = 4096
...

执行 sudo sysctl -a | grep mem可以看到默认的写缓存区大小为212992,我这里修改为5000后重新测试,发现接收端接收到数据比只设置代码内的缓存区接收到的数据更新一些。

接收端一直持续接收到22帧之后刷新。

[ZMQ] ZMQ_PUB和ZMQ_SUB 关于缓存区的问题

总结原因

目前确定是因为内核缓存区导致的旧数据一直存在,这个很合理,但是不清楚为什么zmq的 zmq_setsockopt设置内核缓存区无效。

代码示例

发送端

//
// Created by kongshui on 22-8-23.

//
#include
#include
#include

void qqq()
{
    const int first_count = 10000000;
    int hwm = 5;
    int time = 10;

    auto con = zmq_ctx_new();

    // Set up bind socket
    void *pub_socket = zmq_socket(con, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDTIMEO, &time, sizeof(time));
    zmq_setsockopt(pub_socket, ZMQ_RCVTIMEO, &time, sizeof(time));
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    zmq_bind(pub_socket, "ipc:///tmp/qos_out");

    int buf_size = 5000;
    auto rrr = zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));
    printf("rrr: %d", rrr);
    hwm = 10;
    size_t hwm_size = sizeof(hwm);
    rrr = zmq_getsockopt(pub_socket, ZMQ_SNDBUF, &hwm, &hwm_size);
    printf("rrr: %d, size: %d\n", rrr, hwm);

    sleep(3);

    // Send messages
    int send_count = 0;
    int recv_count = 0;

    std::string msg;
    while (send_count < first_count)
    {
        msg = std::to_string(send_count);
        int res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);

        printf("res:%d, msg:%s", res, msg.c_str());
        if (msg.length() == res)
            ++send_count;
        else
            break;

        usleep(1000);
    }

    // Clean up
    zmq_close(pub_socket);
}

void ttt()
{
    zmq::context_t context(1);

    const int first_count = 15;
    int hwm = 10;
    int buf_size = 10000;
    zmq::socket_t pub_socket(context, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));

    pub_socket.bind("ipc:///tmp/qos_in");

    std::string msg = "111";

    while (true)
    {
        auto res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);
        printf("res:%d\n", res);
    }
}

int main(int, char **)
{
    qqq();

    return 0;
}

接收端

//
// Created by kongshui on 22-8-23.

//
#include
#include
#include

void qqq()
{
    const int first_count = 15;
    int hwm = 5;
    int time = 10;

    auto con = zmq_ctx_new();

    hwm = 10;
    // Set up connect socket
    void *sub_socket = zmq_socket(con, ZMQ_SUB);
    zmq_setsockopt(sub_socket, ZMQ_SNDTIMEO, &time, sizeof(time));
    zmq_setsockopt(sub_socket, ZMQ_RCVTIMEO, &time, sizeof(time));
    zmq_setsockopt(sub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(sub_socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));

    int buf_size = 5000;
    std::string topic = "www";

    auto rrr = zmq_setsockopt(sub_socket, ZMQ_RCVBUF, &buf_size, sizeof(buf_size));
    printf("rrr: %d", rrr);
    hwm = 10;
    size_t hwm_size = sizeof(hwm);
    rrr = zmq_getsockopt(sub_socket, ZMQ_RCVBUF, &hwm, &hwm_size);
    printf("rrr: %d, size: %d\n", rrr, hwm);

    zmq_connect(sub_socket, "ipc:///tmp/qos_out");
//    zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
    zmq_setsockopt(sub_socket, ZMQ_SUBSCRIBE, nullptr, 0);

    sleep(3);

    // Send messages
    int send_count = 0;
    int recv_count = 0;

    std::string msg;

    char res[1024] = {0};
    while (zmq_recv(sub_socket, res, 1021, ZMQ_DONTWAIT) > 0)
    {
        ++recv_count;
        printf("res:%s", res);
        printf("first_count: %d, recv_count: %d\n", first_count, recv_count);
        memset(res, 0, sizeof(res));
    }

    while (1)
    {
        if (zmq_recv(sub_socket, res, 1021, ZMQ_DONTWAIT) > 0)
        {
            ++recv_count;
            printf("res:%s", res);
            memset(res, 0, sizeof(res));
            printf("first_count: %d, recv_count: %d\n", first_count, recv_count);

            sleep(1);
        }
    }

    // Clean up
    zmq_close(sub_socket);
}

void ttt()
{
    zmq::context_t context(1);

    const int first_count = 15;
    int hwm = 10;
    int buf_size = 10000;
    zmq::socket_t pub_socket(context, ZMQ_PUB);
    zmq_setsockopt(pub_socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));
    zmq_setsockopt(pub_socket, ZMQ_SNDBUF, &buf_size, sizeof(buf_size));

    pub_socket.bind("ipc:///tmp/qos_in");

    std::string msg = "111";

    while (true)
    {
        auto res = zmq_send(pub_socket, msg.c_str(), msg.length(), ZMQ_DONTWAIT);
        printf("res:%d\n", res);
    }
}

int main(int, char **)
{
    qqq();

    return 0;
}

Original: https://www.cnblogs.com/jiangyibo/p/16619319.html
Author: 空水
Title: [ZMQ] ZMQ_PUB和ZMQ_SUB 关于缓存区的问题

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/619563/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球