29.多线程,分组服务端,1万客户端连接,服务端使用

1.EasyTcpServer服务端的使用。

2.客户端 发送 登录消息
服务端接收登录消息 返回消息给 客户端

存在问题:网络抖动较大

客户端:

DataHeader.hpp

EasyTcpClient.hpp

main.cpp

服务端:继承EasyTcpServer,重写接口函数

DataHeader.hpp

同客户端

CELLTimestamp.hpp

EasyTcpServer.hpp

#ifndef _EasyTcpServer_hpp_
#define _EasyTcpServer_hpp_

#ifdef _WIN32
    #define FD_SETSIZE      2506
    #define WIN32_LEAN_AND_MEAN
    #define _WINSOCK_DEPRECATED_NO_WARNINGS //也可以放到工程属性,预处理中
    #define _CRT_SECURE_NO_WARNINGS
    #include 
    #include 
    #pragma comment(lib, "ws2_32.lib")
#else
    #include
    #include
    #include<string.h>
    #define SOCKET int
    #define INVALID_SOCKET (SOCKET)(~0)
    #define SOCKET_ERROR (-1)
#endif

#include 
#include 
#include 
#include 
#include 
#include "DataHeader.hpp"
#include "CELLTimestamp.hpp"

#define _CELLSERVERS_THREAD_COUNT 4

#define RECV_BUFF_SIZE 10240

//客户端数据类型
class ClientSocket
{
public:
    ClientSocket(SOCKET sockfd = INVALID_SOCKET)
    {
        _sockfd = sockfd;
        //memset(_szMsgBuf, 0, sizeof(_szMsgBuf));
    }

    SOCKET sockfd()
    {
        return _sockfd;
    }

    char* msgBuf()
    {
        return _szMsgBuf;
    }

    int getlastPos()
    {
        return _lastPos;
    }

    void setlastPos(int pos)
    {
        _lastPos = pos;
    }

    //指定socket发送数据
    int SendData(DataHeader* header)
    {
        if (header)
        {
            return send(_sockfd, (const char*)header, header->dataLength, 0);
        }
        return SOCKET_ERROR;
    }

private:
    SOCKET _sockfd = INVALID_SOCKET;
    char _szMsgBuf[RECV_BUFF_SIZE * 5] = {};
    int _lastPos = 0;
};

//网络事件接口
class INetEvent
{
public:
    //客户端加入事件
    virtual void OnNetJoin(ClientSocket* pClient) = 0;
    //客户端离开事件
    virtual void OnNetLeave(ClientSocket* pClient) = 0;
    //客户端消息事件
    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header) = 0;
};

class CellServer
{
public:
    CellServer(SOCKET sock = INVALID_SOCKET)
    {
        _sock = sock;
        _pNetEvent = nullptr;
    }

    ~CellServer()
    {
        Close();

        _sock = INVALID_SOCKET;
    }

    //关闭socket
    void Close()
    {

#ifdef _WIN32
        // 关闭套接字
        for (size_t n = 0; n<_clients.size>)
        {
            closesocket(_clients[n]->sockfd());
            delete _clients[n];
        }
#else
        for (size_t n = 0; n<_clients.size>)
        {
            close(_clients[n]->sockfd());
            delete _clients[n];
        }
#endif

        _clients.clear();

    }

    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    void Start()
    {
        //std::thread t(&CellServer::onRun, this);
        _pThread =  std::thread(std::mem_fn(&CellServer::onRun), this);

    }

    //接收数据 处理粘包 拆分包
    int RecvData(ClientSocket* pClient)
    {

        // 接收客户端数据
        int nLen = (int)recv(pClient->sockfd(), _szRecv, RECV_BUFF_SIZE, 0);
        //printf("nLen=%d\n", nLen);
        if (nLen 0)
        {
            //printf("客户端%d已退出。\n", pClient->sockfd());
            return -1;
        }

        memcpy(pClient->msgBuf() + pClient->getlastPos(), _szRecv, nLen);
        //消息缓冲区的数据尾部位置后移
        pClient->setlastPos(pClient->getlastPos() + nLen);

        //判断消息缓冲区的数据长度大于消息头DataHeader长度
        while (pClient->getlastPos() >= sizeof(DataHeader))
        {
            //这时就可以知道当前消息的长度
            DataHeader* header = (DataHeader*)pClient->msgBuf();
            //判断是否可以获取一个完整消息
            if (pClient->getlastPos() >= header->dataLength)
            {
                //剩余的未处理消息缓冲区数据的长度
                int nSize = pClient->getlastPos() - header->dataLength;
                //处理网络消息
                onNetMsg(pClient, header);
                //将消息缓冲区剩余未处理数据前移
                memcpy(pClient->msgBuf(), pClient->msgBuf() + header->dataLength, nSize);
                //将消息缓冲区的数据尾部位置前移
                pClient->setlastPos(nSize);
            }
            else
            {
                //消息缓冲区剩余数据不够一条完整消息
                break;
            }
        }

        return 0;
    }

    //响应网络消息
    virtual void onNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _pNetEvent->OnNetMsg(pClient, header);

        switch (header->cmd)
        {
        case CMD_LOGIN:
        {
            Login *pLogin = (Login*)header;
            //printf("客户端%d,cmd logint,user:%s,password:%s\n", _cSock, pLogin->userName, pLogin->passWord);
            //----------------------------
            //----------------------------
            LoginResult longinRet = {};
            pClient->SendData(&longinRet);
        }
        break;
        case CMD_LOGOUT:
        {

            LogOut *pLogOut = (LogOut*)header;
            printf("客户端%d,cmd logout,user:%s\n", pClient->sockfd(), pLogOut->userName);
            //----------------------------
            LogOutResult logOutRet = {};

        }
        break;
        default:
        {
            DataHeader header = {};
            header.cmd = CMD_ERROR;

        }
        break;
        }
    }

    bool onRun()
    {

        while ( isRun() )
        {

            if (_clientsBuff.size() > 0)
            {
                std::lock_guard<:mutex> lock(_mutex);
                for (auto pClient : _clientsBuff)
                {
                    _clients.push_back(pClient);
                }
                _clientsBuff.clear();
            }

            //如果没有需要处理的客户端,就跳过
            if (_clients.empty())
            {
                std::chrono::milliseconds t(1);
                std::this_thread::sleep_for(t);
                continue;
            }

            fd_set fdRead;
            //fd_set fdWrite;
            //fd_set fdExp;

            FD_ZERO(&fdRead);
            //FD_ZERO(&fdWrite);
            //FD_ZERO(&fdExp);

            SOCKET maxSock = _clients[0]->sockfd();
            for (size_t n = 0; n<_clients.size>)
            {
                FD_SET(_clients[n]->sockfd(), &fdRead);
                if (maxSock < _clients[n]->sockfd())
                {
                    maxSock = _clients[n]->sockfd();
                }
            }

            timeval t = { 0, 1};
            //int ret = select(maxSock + 1, &fdRead, &fdWrite, &fdExp, &t);
            int ret = select(maxSock + 1, &fdRead, nullptr, nullptr, nullptr);
            if (ret < 0)
            {
                printf("select任务结束。\n");
                Close();
                return false;
            }

            for (int n = (int)_clients.size() - 1; n >= 0; n--)
            {
                if (FD_ISSET(_clients[n]->sockfd(), &fdRead))
                {
                    if (-1 == RecvData(_clients[n]))
                    {
                        auto iter = _clients.begin() + n;
                        if (iter != _clients.end())
                        {
                            if (_pNetEvent)
                            {
                                _pNetEvent->OnNetLeave(_clients[n]);
                            }
                            delete _clients[n];
                            _clients.erase(iter);
                        }
                    }
                }
            }

        }

        return true;
    }

    void addClient(ClientSocket* pClient)
    {
        _mutex.lock();
        _clientsBuff.push_back(pClient);
        _mutex.unlock();
    }

    size_t getClientCount()
    {
        return _clients.size() + _clientsBuff.size();
    }

    void setEventObj(INetEvent* event)
    {
        _pNetEvent = event;
    }

private:
    SOCKET _sock;
    //正式客户队列
    std::vector _clients;
    //缓冲客户队列
    std::vector _clientsBuff;
    //缓冲客户端队列的锁
    std::mutex _mutex;
    std::thread _pThread;
    char _szRecv[RECV_BUFF_SIZE] = { };
private:
    //网络事件对象
    INetEvent* _pNetEvent;
};

class EasyTcpServer:public INetEvent
{
public:
    EasyTcpServer()
    {
        _sock = INVALID_SOCKET;
        _clientCount = 0;
        _recvCount = 0;
    }

    virtual ~EasyTcpServer()
    {
        Close();
    }

    //初始化socket
    SOCKET InitSocket()
    {
#ifdef _WIN32
        WORD ver = MAKEWORD(2, 2);
        WSADATA dat;
        WSAStartup(ver, &dat);
#endif // _WIN32

        if (_sock != INVALID_SOCKET)
        {
            Close();
        }

        _sock = socket(AF_INET, SOCK_STREAM, 0);
        if (INVALID_SOCKET == _sock)
        {
            printf("错误,建立Socket失败...\n");
        }
        else
        {
            printf("建立Socket成功...\n");
        }

        return _sock;
    }

    //绑定端口号
    int Bind(const char* ip, unsigned short port)
    {
        sockaddr_in _sin = {};
        _sin.sin_family = AF_INET;
        _sin.sin_port = htons(4567);//host to net unsigned short

                                    //使用127.0.0.1可以防止外网访问
                                    //启用本机全部的ip地址可以使用,INADDR_ANY

#ifdef _WIN32
        if (ip)
        {
            _sin.sin_addr.S_un.S_addr = inet_addr(ip);
        }
        else
        {
            _sin.sin_addr.S_un.S_addr = INADDR_ANY;
        }

#else
        if (ip)
        {
            _sin.sin_addr.s_addr = inet_addr(ip);
        }
        else
        {
            _sin.sin_addr.s_addr = INADDR_ANY;
        }

#endif

        int ret = bind(_sock, (sockaddr*)&_sin, sizeof(sockaddr_in));
        if (SOCKET_ERROR == ret)
        {
            printf("错误,绑定网络端口失败...\n");
        }
        else
        {
            printf("绑定网络端口成功...\n");
        }

        return ret;
    }

    //监听端口号
    int Listen(int cnt)
    {
        int ret = listen(_sock, cnt);
        if (SOCKET_ERROR == ret)
        {
            printf("错误,监听网络端口失败...\n");
        }
        else
        {
            printf("监听网络端口成功...\n");
        }

        return ret;
    }

    //接收客户端连接
    int Accept()
    {
        sockaddr_in clientAddr = {};
        int nAddrLen = sizeof(clientAddr);
        SOCKET _cSock = INVALID_SOCKET;
#ifdef _WIN32
        _cSock = accept(_sock, (sockaddr*)&clientAddr, &nAddrLen);
#else
        _cSock = accept(_sock, (sockaddr*)&clientAddr, (socklen_t*)&nAddrLen);
#endif
        if (INVALID_SOCKET == _cSock)
        {
            printf("错误,接受到无效客户端socket...\n");
        }
        else
        {

            //NewUserJoin msg;
            //msg.sock = _cSock;
            //Send2All(&msg);
            //_clients.push_back(new ClientSocket(_cSock));
            //将新客户端分配给客户端数量最少的cellServer
            addClientToCellServer(new ClientSocket(_cSock));

            //printf("new user join in:socket=%d,IP=%s,count=%d\n", _cSock, inet_ntoa(clientAddr.sin_addr), _clients.size());
        }

        return _cSock;
    }

    void Start(int nCellServer)
    {
        for (int n=0; n)
        {
            auto ser = new CellServer(_sock);
            _cellServers.push_back(ser);
            //注册网络事件接收对象
            ser->setEventObj(this);
            //启动消息处理线程
            ser->Start();
        }
    }

    //关闭socket
    void Close()
    {

        if (_sock != INVALID_SOCKET)
        {
#ifdef _WIN32

            closesocket(_sock);
            //清除Windows socket环境
            WSACleanup();
#else
            close(_sock);
#endif

            _sock = INVALID_SOCKET;
        }

    }

    void addClientToCellServer(ClientSocket*  pClient)
    {
        OnNetJoin(pClient);

        //查找客户数量最少的CellServer消息处理对象
        auto pMinServer = _cellServers[0];
        for (auto pCellServer : _cellServers)
        {
            if (pCellServer->getClientCount() < pMinServer->getClientCount())
            {
                pMinServer = pCellServer;
            }

        }

        pMinServer->addClient(pClient);
    }

    //处理网络消息
    bool OnRun()
    {
        if (!isRun())
        {
            return false;
        }

        time4msg();

        fd_set fdRead;
        //fd_set fdWrite;
        //fd_set fdExp;

        FD_ZERO(&fdRead);
        //FD_ZERO(&fdWrite);
        //FD_ZERO(&fdExp);

        FD_SET(_sock, &fdRead);
        //FD_SET(_sock, &fdWrite);
        //FD_SET(_sock, &fdExp);

        SOCKET maxSock = _sock;

        timeval t = {0, 10};
        //int ret = select(maxSock + 1, &fdRead, &fdWrite, &fdExp, &t);
        int ret = select(maxSock + 1, &fdRead, 0, 0, &t);
        if (ret < 0)
        {
            printf("select任务结束。\n");
            Close();
            return false;
        }

        if (FD_ISSET(_sock, &fdRead))
        {
            FD_CLR(_sock, &fdRead);//从集合移除了_sock

            Accept();
        }

        return true;
    }

    //是否工作中
    bool isRun()
    {
        return _sock != INVALID_SOCKET;
    }

    //计算每秒钟收到了多少网络消息
    void time4msg()
    {

        auto t1 =  _tTime.getElapsedTimeInSec();
        if (t1 > 1.0)
        {
            printf("time:%lf,sock:%d,clients:%d,recv count:%d\n", t1, _sock, _clientCount, (int)(_recvCount/t1));
            _recvCount = 0;
            _tTime.update();
        }

    }

    //void Send2All(DataHeader* header)
    //{
    //    if (isRun() && header)
    //    {
    //        for (int n = (int)_clients.size() - 1; n >= 0; n--)
    //        {
    //            SendData(_clients[n]->sockfd(), header);
    //        }
    //    }

    //}

    virtual void OnNetJoin(ClientSocket* pClient)
    {
        _clientCount++;
    }

    virtual void OnNetLeave(ClientSocket* pClient)
    {
        _clientCount--;
    }

    virtual void OnNetMsg(ClientSocket* pClient, DataHeader* header)
    {
        _recvCount++;
    }

private:
    SOCKET _sock;
    //消息处理对象,内部会创建线程
    std::vector _cellServers;
    CELLTimestamp _tTime;

protected:
    std::atomic<int> _clientCount;
    //收到消息计数
    std::atomic<int> _recvCount;
};

#endif*>;>*>*>

main.cpp

Original: https://www.cnblogs.com/zhangxuan/p/14448900.html
Author: 邶风
Title: 29.多线程,分组服务端,1万客户端连接,服务端使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/539838/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 7月8日 学习日记

    HashMap 通过扩容可以减少链表长度 当链表长度超过8时会扩容 当数组长度到达64且链表长度超过8个的时候会转化成红黑树 红黑树 左结点都是比结点小 右结点都是比结点大 先比较…

    Java 2023年6月6日
    066
  • JNPF3.4.2系统升级公告

    尊敬的引迈客户: 您好!为了给您提供更优质的服务,引迈信息将对JNPF快速开发平台进行优化,并于近期同步上线,新版本、新升级,全新体验,敬请期待! 本次优化涉及【在线开发】、【流程…

    Java 2023年6月5日
    076
  • 1、什么是注解

    Annotation JDK5.0 始引入的新技术 0 An n otationxe 0 An n otation 的作用 1、不是程序本身, 可以对程序作出解释. ( 这一点和注…

    Java 2023年6月8日
    065
  • 图解BM(Boyer-Moore)字符串匹配算法+代码实现

    简介 本篇文章主要分为两个大的部分,第一部分通过图解的方式讲解BM算法,第二部分则代码实现一个简易的BM算法。 基本概念 bm是一个字符串匹配算法,有实验统计,该算法是著名kmp算…

    Java 2023年6月9日
    082
  • Skywalking-02:如何写一个Skywalking trace插件

    如何写一个Skywalking trace插件 javaagent 原理 美团技术团队-Java 动态调试技术原理及实践 类图 实现 ConsumeMessageConcurren…

    Java 2023年6月5日
    076
  • Web字体简介_TTF_OTF_WOFF_EOT以及SVG

    字体格式有太多选择,不幸的是始终 没有一个能在所有的浏览器上通用 。这意味着,你必须使用多种字体的方案来保持 用户跨平台的一致性体验 。本文内容如题,会依次介绍一下 TTF、OTF…

    Java 2023年6月5日
    058
  • 转摘:Spring、SpringMVC和Springboot的区别

    一、概念 1、Spring Spring是一个开源容器框架,可以接管web层,业务层,dao层,持久层的组件,并且可以配置各种bean,和维护bean与bean之间的关系。其核心就…

    Java 2023年5月30日
    064
  • JVM学习笔记之Java内存区域与OOM【二】

    Java 内存区域与 OOM 虚拟机基本结构图示 一、运行时数据区域 运行时数据区域 图示 标注颜色的两块区域:所有线程共享的数据区域 1.1 程序计数器(progams coun…

    Java 2023年6月5日
    098
  • 利用 Spring Boot 中的 @ConfigurationProperties,优雅绑定配置参数

    使用 @Value(“${property}”) 注释注入配置属性有时会很麻烦,尤其是当你使用多个属性或你的数据是分层的时候。 Spring Boot 引入…

    Java 2023年5月30日
    082
  • cannot resolve symbol ‘springframework‘解决

    解决方式:https://blog.csdn.net/cxd3341/article/details/109005959 Original: https://www.cnblogs…

    Java 2023年5月29日
    099
  • Java类与对象的讨论

    关于现实世界的对象真的很多很多,以下是最近探讨的一些对象: 这样定义是否存在争议呢? 中秋节与教师节碰杯,正值花好与月圆,桃李满天下! 还有月越来越圆,愿事事如愿! 手机:价钱(p…

    Java 2023年6月5日
    068
  • git 版本控制命令笔记

    git 获取与创建项目 你得先有一个git仓库,才能用它进行操作。仓库是Git存放你要保存的快照的地方。 创建仓库的两种方式: init 通过命令行初始化。 clone 如果我们想…

    Java 2023年6月8日
    073
  • 计算机的基本认识

    所有图片均为西部开源所有,仅作为随笔所用。 计算机是什么? 计算机的硬件组成 装机 主要设备 有CPU,内存(硬盘,内存条),主板,电脑就能跑起来了CPU拥有核显的话可以不用再装显…

    Java 2023年6月9日
    072
  • Java单例模式推荐写法-双重检测机制实现单例模式

    本文为joshua317原创文章,转载请注明:转载自joshua317博客 https://www.joshua317.com/article/256 Java单例模式推荐写法&#…

    Java 2023年5月29日
    055
  • String 部分方法使用

    package com.Mxhlin.String; import java.util.Locale; /** * @author Mxhlin * @Email fuhua277…

    Java 2023年6月7日
    071
  • 米哈游大量招募新同学,校招提前批最后一天!

    米哈游大量招募新同学: 1.周末双休,工作日早十晚七,上班不打卡,全凭自觉; 2.团队氛围很不错,有成长空间,拒绝无意义加班和内卷; 3.免费晚餐线上订餐,不限量零食饮料还有咖啡和…

    Java 2023年6月8日
    081
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球