websocket:二.Golang实现Websocket消息通知

我们在设计产品的时候通常都会遇到消息通知的时候,比如用户下单支付成功,比如用户有站内信来可以实时通知。而http是单向的,客户端请求,服务端返回,这次请求就已经结束。而websocket可以保持连接实现长连接,在遇到通知时往往使用websocket来达到服务端主动向客户端发送消息的目的。

我们的目标是实现服务端主动向某个用户发消息。所以要做到一下四步。

我们这里使用到 github.com/gorilla/websocket 包。

首先是定义一个客户端连接的结构,先有了连接的结构才能保存连接,ID是一个客户端连接的id,而Socket是真正的客户端连接

// 客户端连接信息
type Client struct {
    ID            string          // 连接ID
    AccountId     string          // 账号id, 一个账号可能有多个连接
    Socket        *websocket.Conn // 连接
    HeartbeatTime int64           // 前一次心跳时间
}

然后定义一个客户端管理,来管理所有的客户端连接,并且实例化为一个全局的变量。

// 消息类型
const (
    MessageTypeHeartbeat = "heartbeat" // 心跳
    MessageTypeRegister  = "register"  // 注册

    HeartbeatCheckTime = 9  // 心跳检测几秒检测一次
    HeartbeatTime      = 20 // 心跳距离上一次的最大时间

    ChanBufferRegister = 100 // 注册chan缓冲
    ChanBufferUnregister = 100 // 注销chan大小
)

// 客户端管理
type ClientManager struct {
    Clients  map[string]*Client  // 保存连接
    Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接
    mu       *sync.Mutex
}

// 定义一个管理Manager
var Manager = ClientManager{
    Clients:  make(map[string]*Client),  // 参与连接的用户,出于性能的考虑,需要设置最大连接数
    Accounts: make(map[string][]string), // 账号和连接关系
    mu:       new(sync.Mutex),
}

var (
    RegisterChan   = make(chan *Client, ChanBufferRegister) // 注册
    unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
)

这里还要封装一下服务器给客户端发消息的格式,以便客户端连接成功后服务端给客户端回复消息

// 封装回复消息
type ServiceMessage struct {
    Type    string                json:"type" // 类型
    Content ServiceMessageContent json:"content"
}
type ServiceMessageContent struct {
    Body     string json:"body"      // 主要数据
    MetaData string json:"meta_data" // 扩展数据
}

func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
    replyMsg := ServiceMessage{
        Type:    t,
        Content: content,
    }
    msg, _ := json.Marshal(replyMsg)
    return msg
}

管理连接

连接保持在Manager里的Clients,和Accounts。Clients用于保存每个与客户端通信的连接。而Account保持者连接id与连接分类(Category)的绑定关系。

// 注册注销
func register() {
    for {
        select {
        case conn := <-registerchan: 新注册,新连接 加入连接,进行管理 accountbind(conn) 回复消息 content :="CreateReplyMsg(MessageTypeRegister," servicemessagecontent{}) _="conn.Socket.WriteMessage(websocket.TextMessage," content) case conn 注销,或者没有心跳 关闭连接 删除client unaccountbind(conn) } 绑定账号 func accountbind(c *client) { manager.mu.lock() defer manager.mu.unlock() 加入到连接 manager.clients[c.id]="c" 加入到绑定 if _, ok 该账号已经有绑定,就追加一个绑定 manager.accounts[c.accountid]="append(Manager.Accounts[c.AccountId]," c.id) else 没有就新增一个账号的绑定切片 解绑账号 unaccountbind(c 取消连接 delete(manager.clients, 取消绑定 len(manager.accounts[c.accountid])> 0 {
        for k, clientId := range Manager.Accounts[c.AccountId] {
            if clientId == c.ID { // &#x627E;&#x5230;&#x7ED1;&#x5B9A;&#x5BA2;&#x6237;&#x7AEF;Id
                Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId][:k], Manager.Accounts[c.AccountId][k+1:]...)
            }
        }
    }
}

</-registerchan:>

每隔一段时间,就检测一次心跳,如果上次心跳时间超过了HeartbeatTime时间视为已经断开连接。

// &#x7EF4;&#x6301;&#x5FC3;&#x8DF3;
func heartbeat() {
    for {
        // &#x83B7;&#x53D6;&#x6240;&#x6709;&#x7684;Clients
        Manager.mu.Lock()
        clients := make([]*Client, len(Manager.Clients))
        for _, c := range Manager.Clients {
            clients = append(clients, c)
        }
        Manager.mu.Unlock()

        for _, c := range clients {
            if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
                unAccountBind(c)
            }
        }

        time.Sleep(time.Second * HeartbeatCheckTime)
    }
}

// &#x7BA1;&#x7406;&#x8FDE;&#x63A5;
func Start() {
    // &#x68C0;&#x67E5;&#x5FC3;&#x8DF3;
    go func() {
        defer func() {
            if r := recover(); r != nil {
                log.Println(r)
            }
        }()
        heartbeat()
    }()

    // &#x6CE8;&#x518C;&#x6CE8;&#x9500;
    go func() {
        defer func() {
            if r := recover(); r != nil {
                log.Println(r)
            }
        }()
        register()
    }()
}

收发消息

// &#x6839;&#x636E;&#x8D26;&#x53F7;&#x83B7;&#x53D6;&#x8FDE;&#x63A5;
func GetClient (accountId string) []*Client{
    clients := make([]*Client,0)

    Manager.mu.Lock()
    defer Manager.mu.Unlock()

    if len(Manager.Accounts[accountId]) > 0 {
        for _,clientId := range Manager.Accounts[accountId] {
            if c,ok := Manager.Clients[clientId]; ok {
                clients = append(clients,c)
            }
        }
    }

    return clients
}

我们这是只是心跳用到了,所以只要判断客户端是心跳消息,然后回复即可。

// &#x8BFB;&#x53D6;&#x4FE1;&#x606F;&#xFF0C;&#x5373;&#x6536;&#x5230;&#x6D88;&#x606F;
func (c *Client) Read() {
    defer func() {
        _ = c.Socket.Close()
    }()
   for {
       // &#x8BFB;&#x53D6;&#x6D88;&#x606F;
       _, body, err := c.Socket.ReadMessage()
       if err != nil {
           break
       }

       var msg struct {
           Type string json:"type"
       }
       err = json.Unmarshal(body, &msg)
       if err != nil {
           log.Println(err)
           continue
       }

       if msg.Type == MessageTypeHeartbeat { // &#x7EF4;&#x6301;&#x5FC3;&#x8DF3;&#x6D88;&#x606F;
           // &#x5237;&#x65B0;&#x8FDE;&#x63A5;&#x65F6;&#x95F4;
           c.HeartbeatTime = time.Now().Unix()

           // &#x56DE;&#x590D;&#x5FC3;&#x8DF3;
           replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
           err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
           if err != nil {
               log.Println(err)
           }
           continue
       }
   }
}

只要找到连接,对连接发送消息即可。

// &#x53D1;&#x9001;&#x6D88;&#x606F;
func Send(accounts []string,message ServiceMessage) error{
    msg,err := json.Marshal(message)
    if err != nil {
        return err
    }

    for _,accountId := range accounts{
        // &#x83B7;&#x53D6;&#x8FDE;&#x63A5;id
        clients := GetClient(accountId)

        // &#x53D1;&#x9001;&#x6D88;&#x606F;
        for _,c := range clients {
            _ = c.Socket.WriteMessage(websocket.TextMessage, msg)
        }
    }

    return nil
}

这里对http请求升级为websocket,然后单独建立一个goroutine去维持连接。下面类似这样调用,但是鉴权,日志等很多细节不完善,只是提供一个思路。

package wesocket

import (
    websocket2 "demo/websocket"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    "github.com/rs/xid"
    "log"
    "net/http"
    "time"
)

type MessageNotifyRequest struct {
    UserId    string form:"user_id"
}

func MessageNotify(ctx *gin.Context) {
    // 获取参数
    var params MessageNotifyRequest
    if err := ctx.ShouldBindQuery(¶ms); err != nil {
        log.Println(err)
        return
    }
    // TODO: 鉴权

    // 将http升级为websocket
    conn, err := (&websocket.Upgrader{
        // 1. 解决跨域问题
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }).Upgrade(ctx.Writer, ctx.Request, nil) // 升级
    if err != nil {
        log.Println(err)
        http.NotFound(ctx.Writer, ctx.Request)
        return
    }

    // 创建一个实例连接
    ConnId := xid.New().String()
    client := &websocket2.Client{
        ID:            ConnId, // 连接id
        AccountId:      fmt.Sprintf("%s", params.UserId),
        HeartbeatTime: time.Now().Unix(),
        Socket:        conn,
    }

    // 用户注册到用户连接管理
    websocket2.RegisterChan

用websocket做消息通知,对于后端来说,主要是绑定连接和管理连接,绑定连接就是用户id和websocket连接建立一个绑定关系,而管理连接就是存储连接,删除连接,维护连接的健康(心跳检测),其次就是定义服务端接收和发送数据的格式。总体大概就是这样一个思路。

Original: https://www.cnblogs.com/ourongxin/p/15925620.html
Author: EthanWell
Title: websocket:二.Golang实现Websocket消息通知

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516434/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Go基础知识梳理(二)

    Go基础知识梳理(二) 简单函数的定义 //&#x6709;&#x53C2;&#x6570;&#x6709;&#x8FD4;&#x5…

    Go语言 2023年5月25日
    043
  • Golang笔记

    本文主要为go的学习过程笔记。 一、基本介绍 1、开发环境安装-windows安装 打开Golang官网,选择对应版本,进行安装。 2、环境变量配置 1)步骤 (1)首先在环境变量…

    Go语言 2023年5月25日
    046
  • Go语言 context包源码学习

    你必须非常努力,才能看起来毫不费力!微信搜索公众号[ 漫漫Coding路 ],一起From Zero To Hero ! 日常 Go 开发中,Context 包是用的最多的一个了,…

    Go语言 2023年5月25日
    059
  • Go 学习路线(2022)

    原文链接: Go 学习路线(2022) Go 语言的发展越来越好了,很多大厂使用 Go 作为主要开发语言,也有很多人开始学习 Go,准备转 Go 开发。 那么,怎么学呢? 我发现,…

    Go语言 2023年5月25日
    045
  • Go基础知识梳理(四)

    Go基础知识梳理(四) GO的哲学是”不要通过共享内存来通信,而是通过通信来共享内存”,通道是GO通过通信来共享内存的载体。 rumtime包常用方法 ru…

    Go语言 2023年5月25日
    035
  • 读 Go 源码,可以试试这个工具

    原文链接: 读 Go 源码,可以试试这个工具 编程发展至今,从面向过程到面向对象,再到现在的面向框架。写代码变成了一件越来越容易的事情。 学习基础语法,看看框架文档,几天时间搞出一…

    Go语言 2023年5月25日
    040
  • 这不会又是一个Go的BUG吧?

    hello,大家好呀,我是小楼。 最近我又双叒叕写了个BUG,一个线上服务死锁了,不过幸亏是个新服务,没有什么大影响。 出问题的是Go的读写锁,如果你是写Java的,不必划走,更要…

    Go语言 2023年5月25日
    062
  • 关于Golang的学习路线

    基础 安装golang环境Golang基础,流程控制,函数,方法,面向对象网络编程(自己做一个简单的tcp的聊天室,websocket,http,命令行工具)并发(可以看一下并发爬…

    Go语言 2023年5月25日
    050
  • 举个栗子之gorpc – 消息的编码和解码

    2022年的第一个rpc,比以往来的更早一些… 留杭过年…写点东西 初始化项目gorpc 借助go module我们可以轻易创建一个新的项目 mkdir g…

    Go语言 2023年5月25日
    050
  • Go语言程序记录日志

    许多软件系统运行中需要日志文件。Go语言程序中,输出日志需要使用包”log”,编写程序十分简单。 像Java语言程序,输出日志时,往往需要使用开源的软件包来…

    Go语言 2023年5月29日
    032
  • Go语言实现大数开方程序

    Go语言的big包实现大数运算,但是有关大整数运算,似乎没有相应的开方程序。 这里给出的程序,实现了大整数的开方运算函数。该程序是基于大整数开方运算的算法实现的。 Go语言程序: …

    Go语言 2023年5月29日
    052
  • RSA.js_公钥加密_NoPadding_golang实现_n_e_pem格式互转

    转载注明来源:本文链接 来自osnosn的博客,写于 2021-09-13. 参考 PKCS1【Golang 实现RSA加密解密】 PKCS1,密钥格式转换(需第三方包)【RSA非…

    Go语言 2023年5月25日
    054
  • 一次不规范HTTP请求引发的nginx响应400问题分析与解决

    背景 最近分析数据偶然发现nginx log中有一批用户所有的HTTP POST log上报请求均返回400,没有任何200成功记录,由于只占整体请求的不到0.5%,所以之前也一直…

    Go语言 2023年5月25日
    076
  • go实用编程-算法篇 -归并排序

    /**** // i: the begin index of old sub-array, j: the begin index of even sub-array | array…

    Go语言 2023年5月25日
    060
  • Go – 使用 sync.WaitGroup 来实现并发操作

    如果你有一个任务可以分解成多个子任务进行处理,同时每个子任务没有先后执行顺序的限制,等到全部子任务执行完毕后,再进行下一步处理。这时每个子任务的执行可以并发处理,这种情景下适合使用…

    Go语言 2023年5月25日
    057
  • [go-websocket 搭建一对一,一对多的聊天室] 第二篇:websocket间的通信

    源码地址https://gitee.com/bin-0821/chat-room-demo-go-websocket 关于websocket,上一篇文章讲述了如何通过websock…

    Go语言 2023年5月25日
    063
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球