31.使用计时器,分析服务端recv的性能

服务端:

使用计时器,计时每秒钟调用了多少次recv,收到了多少数据包。recv每次接收1个字节。

客户端:
使用计时器,计时每秒钟调用了多少次send函数。

调整客户端的线程数,客户端数量,,服务的线程数,进行观察。

客户端:

CELLTimestamp.hpp

DataHeader.hpp

EasyTcpClient.hpp

main.cpp

服务端:

CELLTimestamp.hpp

DataHeader.hpp

EasyTcpClient.hpp

#ifndef _EasyTcpServer_hpp_
#define _EasyTcpServer_hpp_

#ifdef _WIN32
    #define FD_SETSIZE      2506
    #define WIN32_LEAN_AND_MEAN
    #define _WINSOCK_DEPRECATED_NO_WARNINGS //也可以放到工程属性,预处理中
    #define _CRT_SECURE_NO_WARNINGS
    #include 
    #include 
    #pragma comment(lib, "ws2_32.lib")
#else
    #include
    #include
    #include<string.h>
    #define SOCKET int
    #define INVALID_SOCKET (SOCKET)(~0)
    #define SOCKET_ERROR (-1)
#endif

#include 
#include 
#include 
#include 
#include 
#include 
#include "DataHeader.hpp"
#include "CELLTimestamp.hpp"

#define _CELLSERVERS_THREAD_COUNT 4

#define RECV_BUFF_SIZE 10240

//客户端数据类型
class ClientSocket
{
public:
    ClientSocket(SOCKET sockfd = INVALID_SOCKET)
    {
        _sockfd = sockfd;
        //memset(_szMsgBuf, 0, sizeof(_szMsgBuf));
    }

    SOCKET sockfd()
    {
        return _sockfd;
    }

    char* msgBuf()
    {
        return _szMsgBuf;
    }

    int getlastPos()
    {
        return _lastPos;
    }

    void setlastPos(int pos)
    {
        _lastPos = pos;
    }

    //指定socket发送数据
    int SendData(DataHeader* header)
    {
        if (header)
        {
            return send(_sockfd, (const char*)header, header->dataLength, 0);
        }
        return SOCKET_ERROR;
    }

private:
    SOCKET _sockfd = INVALID_SOCKET;
    char _szMsgBuf[RECV_BUFF_SIZE * 5] = {};
    int _lastPos = 0;
};

//网络事件接口
class INetEvent
{
public:
    //客户端加入事件
    virtual void OnNetJoin(ClientSocket* pClient) = 0;
    //客户端离开事件
    virtual void OnNetLeave(ClientSocket* pClient) = 0;
    //客户端消息事件
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header) = 0;
    virtual void OnNetRecv(ClientSocket* pClient) = 0;
};

class CellServer
{
public:
    CellServer(SOCKET sock = INVALID_SOCKET)
    {
        _sock = sock;
        _pNetEvent = nullptr;
    }

    ~CellServer()
    {
        Close();

        //_sock = INVALID_SOCKET;
    }

    //关闭socket
    void Close()
    {

#ifdef _WIN32
        // 关闭套接字
        for (auto iter : _clients)
        {
            closesocket(iter.second->sockfd());
            delete iter.second;
        }
#else
        for (auto iter : _clients)
        {
            close(iter.second->sockfd());
            delete iter.second;
        }
#endif

        _clients.clear();

    }

    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    void Start()
    {
        //std::thread t(&CellServer::onRun, this);
        _pThread =  std::thread(std::mem_fn(&CellServer::onRun), this);

    }

    //接收数据 处理粘包 拆分包
    int RecvData(ClientSocket* pClient)
    {

        // 接收客户端数据
        int nLen = (int)recv(pClient->sockfd(), _szRecv, 1, 0);
        _pNetEvent->OnNetRecv(pClient);

        //printf("nLen=%d\n", nLen);
        if (nLen 0)
        {
            //printf("客户端%d已退出。\n", pClient->sockfd());
            return -1;
        }

        memcpy(pClient->msgBuf() + pClient->getlastPos(), _szRecv, nLen);
        //消息缓冲区的数据尾部位置后移
        pClient->setlastPos(pClient->getlastPos() + nLen);

        //判断消息缓冲区的数据长度大于消息头DataHeader长度
        while (pClient->getlastPos() >= sizeof(DataHeader))
        {
            //这时就可以知道当前消息的长度
            DataHeader* header = (DataHeader*)pClient->msgBuf();
            //判断是否可以获取一个完整消息
            if (pClient->getlastPos() >= header->dataLength)
            {
                //剩余的未处理消息缓冲区数据的长度
                int nSize = pClient->getlastPos() - header->dataLength;
                //处理网络消息
                OnNetMsg(pClient, header);
                //将消息缓冲区剩余未处理数据前移
                memcpy(pClient->msgBuf(), pClient->msgBuf() + header->dataLength, nSize);
                //将消息缓冲区的数据尾部位置前移
                pClient->setlastPos(nSize);
            }
            else
            {
                //消息缓冲区剩余数据不够一条完整消息
                break;
            }
        }

        return 0;
    }

    //响应网络消息
    //响应网络消息
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _pNetEvent->OnNetMsg(pClient, header);
    }

    FD_SET _fdRead_bak;//备份
    bool _clients_change;
    SOCKET _maxSock;
    bool onRun()
    {
        _clients_change = true;
        while ( isRun() )
        {

            if (_clientsBuff.size() > 0)
            {
                std::lock_guard<:mutex> lock(_mutex);
                for (auto pClient : _clientsBuff)
                {
                    _clients[pClient->sockfd()] = pClient;
                }
                _clientsBuff.clear();

                _clients_change = true;
            }

            //如果没有需要处理的客户端,就跳过
            if (_clients.empty())
            {
                std::chrono::milliseconds t(1);
                std::this_thread::sleep_for(t);
                continue;
            }

            fd_set fdRead;
            //fd_set fdWrite;
            //fd_set fdExp;

            FD_ZERO(&fdRead);
            if (_clients_change)//如果有客户端,加入,退出
            {
                _maxSock = _clients.begin()->second->sockfd();
                for (auto iter : _clients)
                {
                    FD_SET(iter.second->sockfd(), &fdRead);
                    if (_maxSock < iter.second->sockfd())
                    {
                        _maxSock = iter.second->sockfd();
                    }
                }

                memcpy(&_fdRead_bak, &fdRead, sizeof(fd_set));
                _clients_change = false;
            }
            else
            {
                memcpy(&fdRead, &_fdRead_bak, sizeof(fd_set));
            }

            //timeval t = { 0, 0};
            //int ret = select(maxSock + 1, &fdRead, &fdWrite, &fdExp, &t);
            int ret = select(_maxSock + 1, &fdRead, nullptr, nullptr, nullptr);
            if (ret < 0)
            {
                printf("select任务结束。\n");
                Close();
                return false;
            }
            else if (ret == 0)
            {
                continue;
            }

#ifdef _WIN32
            for (int n = 0; n < fdRead.fd_count; n++)
            {
                auto iter = _clients.find(fdRead.fd_array[n]);
                if (iter != _clients.end())
                {
                    if (-1 == RecvData(iter->second))
                    {
                        if (_pNetEvent)
                            _pNetEvent->OnNetLeave(iter->second);
                        _clients_change = true;
                        delete iter->second;
                        _clients.erase(iter->first);
                    }
                }
                else {
                    printf("error. if (iter != _clients.end())\n");
                }

            }
#else
            std::vector temp;
            for (auto iter : _clients)
            {
                if (FD_ISSET(iter.second->sockfd(), &fdRead))
                {
                    if (-1 == RecvData(iter.second))
                    {
                        if (_pNetEvent)
                            _pNetEvent->OnNetLeave(iter.second);
                        _clients_change = true;
                        temp.push_back(iter.second);
                    }
                }
            }
            for (auto pClient : temp)
            {
                _clients.erase(pClient->sockfd());
                delete pClient;
            }
#endif

        }

        return true;
    }

    void addClient(ClientSocket* pClient)
    {
        _mutex.lock();
        _clientsBuff.push_back(pClient);
        _mutex.unlock();
    }

    size_t getClientCount()
    {
        return _clients.size() + _clientsBuff.size();
    }

    void setEventObj(INetEvent* event)
    {
        _pNetEvent = event;
    }

private:
    SOCKET _sock;
    //正式客户队列
    //std::vector _clients;*>
    std::map _clients;
    //缓冲客户队列
    std::vector _clientsBuff;
    //缓冲客户端队列的锁
    std::mutex _mutex;
    std::thread _pThread;
    char _szRecv[RECV_BUFF_SIZE] = { };
private:
    //网络事件对象
    INetEvent* _pNetEvent;
};

class EasyTcpServer:public INetEvent
{
public:
    EasyTcpServer()
    {
        _sock = INVALID_SOCKET;
        _clientCount = 0;
        _recvCount = 0;
        _msgCount = 0;
    }

    virtual ~EasyTcpServer()
    {
        Close();
    }

    //初始化socket
    SOCKET InitSocket()
    {
#ifdef _WIN32
        WORD ver = MAKEWORD(2, 2);
        WSADATA dat;
        WSAStartup(ver, &dat);
#endif // _WIN32

        if (_sock != INVALID_SOCKET)
        {
            Close();
        }

        _sock = socket(AF_INET, SOCK_STREAM, 0);
        if (INVALID_SOCKET == _sock)
        {
            printf("错误,建立Socket失败...\n");
        }
        else
        {
            printf("建立Socket成功...\n");
        }

        return _sock;
    }

    //绑定端口号
    int Bind(const char* ip, unsigned short port)
    {
        sockaddr_in _sin = {};
        _sin.sin_family = AF_INET;
        _sin.sin_port = htons(4567);//host to net unsigned short

                                    //使用127.0.0.1可以防止外网访问
                                    //启用本机全部的ip地址可以使用,INADDR_ANY

#ifdef _WIN32
        if (ip)
        {
            _sin.sin_addr.S_un.S_addr = inet_addr(ip);
        }
        else
        {
            _sin.sin_addr.S_un.S_addr = INADDR_ANY;
        }

#else
        if (ip)
        {
            _sin.sin_addr.s_addr = inet_addr(ip);
        }
        else
        {
            _sin.sin_addr.s_addr = INADDR_ANY;
        }

#endif

        int ret = bind(_sock, (sockaddr*)&_sin, sizeof(sockaddr_in));
        if (SOCKET_ERROR == ret)
        {
            printf("错误,绑定网络端口失败...\n");
        }
        else
        {
            printf("绑定网络端口成功...\n");
        }

        return ret;
    }

    //监听端口号
    int Listen(int cnt)
    {
        int ret = listen(_sock, cnt);
        if (SOCKET_ERROR == ret)
        {
            printf("错误,监听网络端口失败...\n");
        }
        else
        {
            printf("监听网络端口成功...\n");
        }

        return ret;
    }

    //接收客户端连接
    int Accept()
    {
        sockaddr_in clientAddr = {};
        int nAddrLen = sizeof(clientAddr);
        SOCKET _cSock = INVALID_SOCKET;
#ifdef _WIN32
        _cSock = accept(_sock, (sockaddr*)&clientAddr, &nAddrLen);
#else
        _cSock = accept(_sock, (sockaddr*)&clientAddr, (socklen_t*)&nAddrLen);
#endif
        if (INVALID_SOCKET == _cSock)
        {
            printf("错误,接受到无效客户端socket...\n");
        }
        else
        {

            //NewUserJoin msg;
            //msg.sock = _cSock;
            //Send2All(&msg);
            //_clients.push_back(new ClientSocket(_cSock));
            //将新客户端分配给客户端数量最少的cellServer
            addClientToCellServer(new ClientSocket(_cSock));

            //printf("new user join in:socket=%d,IP=%s,count=%d\n", _cSock, inet_ntoa(clientAddr.sin_addr), _clients.size());
        }

        return _cSock;
    }

    void Start(int nCellServer)
    {
        for (int n=0; n)
        {
            auto ser = new CellServer(_sock);
            _cellServers.push_back(ser);
            //注册网络事件接收对象
            ser->setEventObj(this);
            //启动消息处理线程
            ser->Start();
        }
    }

    //关闭socket
    void Close()
    {

        if (_sock != INVALID_SOCKET)
        {
#ifdef _WIN32

            closesocket(_sock);
            //清除Windows socket环境
            WSACleanup();
#else
            close(_sock);
#endif

            _sock = INVALID_SOCKET;
        }

    }

    void addClientToCellServer(ClientSocket*  pClient)
    {
        OnNetJoin(pClient);

        //查找客户数量最少的CellServer消息处理对象
        auto pMinServer = _cellServers[0];
        for (auto pCellServer : _cellServers)
        {
            if (pCellServer->getClientCount() < pMinServer->getClientCount())
            {
                pMinServer = pCellServer;
            }

        }

        pMinServer->addClient(pClient);
    }

    //处理网络消息
    bool OnRun()
    {
        if (!isRun())
        {
            return false;
        }

        time4msg();

        fd_set fdRead;
        //fd_set fdWrite;
        //fd_set fdExp;

        FD_ZERO(&fdRead);
        //FD_ZERO(&fdWrite);
        //FD_ZERO(&fdExp);

        FD_SET(_sock, &fdRead);
        //FD_SET(_sock, &fdWrite);
        //FD_SET(_sock, &fdExp);

        SOCKET maxSock = _sock;

        timeval t = {0, 10};
        //int ret = select(maxSock + 1, &fdRead, &fdWrite, &fdExp, &t);
        int ret = select(maxSock + 1, &fdRead, 0, 0, &t);
        if (ret < 0)
        {
            printf("select任务结束。\n");
            Close();
            return false;
        }

        if (FD_ISSET(_sock, &fdRead))
        {
            FD_CLR(_sock, &fdRead);//从集合移除了_sock

            Accept();
        }

        return true;
    }

    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    //计算每秒钟收到了多少网络消息
    void time4msg()
    {

        auto t1 =  _tTime.getElapsedTimeInSec();
        if (t1 > 1.0)
        {
            printf("thread,time,socket,clients,recv,msg\n",
                  _cellServers.size(), t1, _sock, (int)_clientCount, (int)(_recvCount/t1), (int)(_msgCount/t1));
            _recvCount = 0;
            _msgCount = 0;
            _tTime.update();
        }

    }

    //void Send2All(DataHeader* header)
    //{
    //    if (isRun() && header)
    //    {
    //        for (int n = (int)_clients.size() - 1; n >= 0; n--)
    //        {
    //            SendData(_clients[n]->sockfd(), header);
    //        }
    //    }

    //}

    virtual void OnNetJoin(ClientSocket* pClient)
    {
        _clientCount++;
    }

    virtual void OnNetLeave(ClientSocket* pClient)
    {
        _clientCount--;
    }

    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _recvCount++;
    }

    virtual void OnNetRecv(ClientSocket* pClient)
    {
        _recvCount++;
    }

private:
    SOCKET _sock;
    //消息处理对象,内部会创建线程
    std::vector _cellServers;
    CELLTimestamp _tTime;

protected:
    std::atomic<int> _clientCount;
    //socket recv计数
    std::atomic<int> _recvCount;
    //收到消息计数
    std::atomic<int> _msgCount;
};

#endif*>;>*>,>*>

main.cpp

Original: https://www.cnblogs.com/zhangxuan/p/14460654.html
Author: 邶风
Title: 31.使用计时器,分析服务端recv的性能

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/543087/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • MySQL的(@i:=@i+1)用处及用法

    作用 (@i:=@i+1):查询结果中新增一列自动生成的序列号,代表定义一个变量,每次叠加1; 示例 一个基本的查询及结果: SELECT * FROM student; 添加(@…

    Java 2023年6月8日
    076
  • 数据结构与算法之线性查找

    线性查找 在一个无顺序的数组中找到第k大的元素是几. 这个问题最简单的解法是先将数组进行排序,然后返回下标k上的元素.如果使用上一节的归并排序则时间复杂度是O(nlogn) 那是否…

    Java 2023年6月8日
    085
  • SpringBoot + SpringCloud Hystrix 实现服务熔断

    什么是Hystrix 在分布式系统中,每个服务都可能会调用很多其他服务,被调用的那些服务就是依赖服务,有的时候某些依赖服务出现故障也是很常见的。Hystrix是Netflix公司开…

    Java 2023年5月30日
    086
  • jeesite复杂查询语句实现

    在一开始用jeesite进行开发的时候,偶尔会碰到许多问题,解决方式一般也有很多方法 一、多表查询的时候,怎么添加子表的条件进行查询? 举例:有一个文件信息表file,关联了用户表…

    Java 2023年6月5日
    075
  • 微信二维码支付

    准备工作 概述:微信扫码支付是商户系统按微信支付协议生成支付二维码,用户再用微信 &#x626B;&#x4E00;&#x626B;完成支付的模式。该模式适用…

    Java 2023年6月13日
    091
  • Android 实现开机自启APP

    原文地址:Android 实现开机自启APP – Stars-One的杂货小窝 公司有个项目,需要实现自启动的功能,本来想着是设置桌面启动器的方式去实现,但是设备是华为…

    Java 2023年6月13日
    0112
  • java 异常类与自定义异常

    目录 异常类 Exception 类的层次 throws/throw 关键字: throws: throw: try catch finally语句 声明自定义异常 异常类 在 J…

    Java 2023年6月9日
    0111
  • JavaMail 网易邮件发送demo-发送带附件的邮件

    使用Java应用程序发送 E-mail,需要在机子上安装 JavaMail API 和Java Activation Framework (JAF) 。 可以从 Java 网站下载…

    Java 2023年5月29日
    088
  • 四、Java基础

    Java基础 在开始学习Java基础之前,我们先来学习一下IDEA 打开IDEA,新建一个项目(New Project),选择空项目(Empty Project),填写项目名(Pr…

    Java 2023年6月7日
    069
  • Collections.sort排序方法的最简化写法

    Collections.sort排序方法的最简化写法 Collections.sort排序方法的最简化写法 假定按照Number对象的Id字段进行排序 正序排序 Collectio…

    Java 2023年6月16日
    088
  • Spring5 源码解析 IOC默认标签解析上半部分之解析BeanDefinition

    前言 前两篇文章,Spring5源码解析_整体架构分析、IOC容器的基本实现大家应该对Spring的IOC容器有了初步的了解,接下来我们研究Spring标签的解析,Spring标签…

    Java 2023年6月7日
    065
  • 枚举

    枚举 自定义类实现枚举 1.不需要提供set方法,因为枚举对象值通常为只读2.对枚举对象/属性使用final + static共同修饰,实现底层优化3.枚举对象名通常使用全部大写,…

    Java 2023年6月5日
    087
  • git使用之eclipse使用

    在github中新建一个空的仓库,仓库名与项目名保持一致,然后在Eclipse中选中项目右键,选择 Team–>commit在新弹出的界面中,填写提交信息,下面的文件全部勾…

    Java 2023年6月5日
    0127
  • rabbitmq centos 7.4 主备安装,加入主节点失败Error: unable to perform an operation on node ‘rabbit@mq2’. Please see diagnostics information and suggestions below.

    Original: https://www.cnblogs.com/yhq1314/p/14558920.htmlAuthor: 春困秋乏夏打盹Title: rabbitmq ce…

    Java 2023年5月30日
    080
  • leetcode2. 两数相加

    题目描述 给你两个非空的链表,表示两个非负的整数。它们每位数字都是按照逆序的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。 你…

    Java 2023年6月9日
    061
  • Springboot 生成验证码

    一、目录结构 二、功能讲解 (1)验证码配置文件 打开KaptchaConfig.java java;gutter:true @Component public class Kap…

    Java 2023年5月30日
    071
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球