websocket:二.Golang实现Websocket消息通知

我们在设计产品的时候通常都会遇到消息通知的时候,比如用户下单支付成功,比如用户有站内信来可以实时通知。而http是单向的,客户端请求,服务端返回,这次请求就已经结束。而websocket可以保持连接实现长连接,在遇到通知时往往使用websocket来达到服务端主动向客户端发送消息的目的。

我们的目标是实现服务端主动向某个用户发消息。所以要做到一下四步。

我们这里使用到 github.com/gorilla/websocket 包。

首先是定义一个客户端连接的结构,先有了连接的结构才能保存连接,ID是一个客户端连接的id,而Socket是真正的客户端连接

// 客户端连接信息
type Client struct {
    ID            string          // 连接ID
    AccountId     string          // 账号id, 一个账号可能有多个连接
    Socket        *websocket.Conn // 连接
    HeartbeatTime int64           // 前一次心跳时间
}

然后定义一个客户端管理,来管理所有的客户端连接,并且实例化为一个全局的变量。

// 消息类型
const (
    MessageTypeHeartbeat = "heartbeat" // 心跳
    MessageTypeRegister  = "register"  // 注册

    HeartbeatCheckTime = 9  // 心跳检测几秒检测一次
    HeartbeatTime      = 20 // 心跳距离上一次的最大时间

    ChanBufferRegister = 100 // 注册chan缓冲
    ChanBufferUnregister = 100 // 注销chan大小
)

// 客户端管理
type ClientManager struct {
    Clients  map[string]*Client  // 保存连接
    Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接
    mu       *sync.Mutex
}

// 定义一个管理Manager
var Manager = ClientManager{
    Clients:  make(map[string]*Client),  // 参与连接的用户,出于性能的考虑,需要设置最大连接数
    Accounts: make(map[string][]string), // 账号和连接关系
    mu:       new(sync.Mutex),
}

var (
    RegisterChan   = make(chan *Client, ChanBufferRegister) // 注册
    unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
)

这里还要封装一下服务器给客户端发消息的格式,以便客户端连接成功后服务端给客户端回复消息

// 封装回复消息
type ServiceMessage struct {
    Type    string                json:"type" // 类型
    Content ServiceMessageContent json:"content"
}
type ServiceMessageContent struct {
    Body     string json:"body"      // 主要数据
    MetaData string json:"meta_data" // 扩展数据
}

func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
    replyMsg := ServiceMessage{
        Type:    t,
        Content: content,
    }
    msg, _ := json.Marshal(replyMsg)
    return msg
}

管理连接

连接保持在Manager里的Clients,和Accounts。Clients用于保存每个与客户端通信的连接。而Account保持者连接id与连接分类(Category)的绑定关系。

// 注册注销
func register() {
    for {
        select {
        case conn := <-registerchan: 新注册,新连接 加入连接,进行管理 accountbind(conn) 回复消息 content :="CreateReplyMsg(MessageTypeRegister," servicemessagecontent{}) _="conn.Socket.WriteMessage(websocket.TextMessage," content) case conn 注销,或者没有心跳 关闭连接 删除client unaccountbind(conn) } 绑定账号 func accountbind(c *client) { manager.mu.lock() defer manager.mu.unlock() 加入到连接 manager.clients[c.id]="c" 加入到绑定 if _, ok 该账号已经有绑定,就追加一个绑定 manager.accounts[c.accountid]="append(Manager.Accounts[c.AccountId]," c.id) else 没有就新增一个账号的绑定切片 解绑账号 unaccountbind(c 取消连接 delete(manager.clients, 取消绑定 len(manager.accounts[c.accountid])> 0 {
        for k, clientId := range Manager.Accounts[c.AccountId] {
            if clientId == c.ID { // &#x627E;&#x5230;&#x7ED1;&#x5B9A;&#x5BA2;&#x6237;&#x7AEF;Id
                Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId][:k], Manager.Accounts[c.AccountId][k+1:]...)
            }
        }
    }
}

</-registerchan:>

每隔一段时间,就检测一次心跳,如果上次心跳时间超过了HeartbeatTime时间视为已经断开连接。

// &#x7EF4;&#x6301;&#x5FC3;&#x8DF3;
func heartbeat() {
    for {
        // &#x83B7;&#x53D6;&#x6240;&#x6709;&#x7684;Clients
        Manager.mu.Lock()
        clients := make([]*Client, len(Manager.Clients))
        for _, c := range Manager.Clients {
            clients = append(clients, c)
        }
        Manager.mu.Unlock()

        for _, c := range clients {
            if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
                unAccountBind(c)
            }
        }

        time.Sleep(time.Second * HeartbeatCheckTime)
    }
}

// &#x7BA1;&#x7406;&#x8FDE;&#x63A5;
func Start() {
    // &#x68C0;&#x67E5;&#x5FC3;&#x8DF3;
    go func() {
        defer func() {
            if r := recover(); r != nil {
                log.Println(r)
            }
        }()
        heartbeat()
    }()

    // &#x6CE8;&#x518C;&#x6CE8;&#x9500;
    go func() {
        defer func() {
            if r := recover(); r != nil {
                log.Println(r)
            }
        }()
        register()
    }()
}

收发消息

// &#x6839;&#x636E;&#x8D26;&#x53F7;&#x83B7;&#x53D6;&#x8FDE;&#x63A5;
func GetClient (accountId string) []*Client{
    clients := make([]*Client,0)

    Manager.mu.Lock()
    defer Manager.mu.Unlock()

    if len(Manager.Accounts[accountId]) > 0 {
        for _,clientId := range Manager.Accounts[accountId] {
            if c,ok := Manager.Clients[clientId]; ok {
                clients = append(clients,c)
            }
        }
    }

    return clients
}

我们这是只是心跳用到了,所以只要判断客户端是心跳消息,然后回复即可。

// &#x8BFB;&#x53D6;&#x4FE1;&#x606F;&#xFF0C;&#x5373;&#x6536;&#x5230;&#x6D88;&#x606F;
func (c *Client) Read() {
    defer func() {
        _ = c.Socket.Close()
    }()
   for {
       // &#x8BFB;&#x53D6;&#x6D88;&#x606F;
       _, body, err := c.Socket.ReadMessage()
       if err != nil {
           break
       }

       var msg struct {
           Type string json:"type"
       }
       err = json.Unmarshal(body, &msg)
       if err != nil {
           log.Println(err)
           continue
       }

       if msg.Type == MessageTypeHeartbeat { // &#x7EF4;&#x6301;&#x5FC3;&#x8DF3;&#x6D88;&#x606F;
           // &#x5237;&#x65B0;&#x8FDE;&#x63A5;&#x65F6;&#x95F4;
           c.HeartbeatTime = time.Now().Unix()

           // &#x56DE;&#x590D;&#x5FC3;&#x8DF3;
           replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
           err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
           if err != nil {
               log.Println(err)
           }
           continue
       }
   }
}

只要找到连接,对连接发送消息即可。

// &#x53D1;&#x9001;&#x6D88;&#x606F;
func Send(accounts []string,message ServiceMessage) error{
    msg,err := json.Marshal(message)
    if err != nil {
        return err
    }

    for _,accountId := range accounts{
        // &#x83B7;&#x53D6;&#x8FDE;&#x63A5;id
        clients := GetClient(accountId)

        // &#x53D1;&#x9001;&#x6D88;&#x606F;
        for _,c := range clients {
            _ = c.Socket.WriteMessage(websocket.TextMessage, msg)
        }
    }

    return nil
}

这里对http请求升级为websocket,然后单独建立一个goroutine去维持连接。下面类似这样调用,但是鉴权,日志等很多细节不完善,只是提供一个思路。

package wesocket

import (
    websocket2 "demo/websocket"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    "github.com/rs/xid"
    "log"
    "net/http"
    "time"
)

type MessageNotifyRequest struct {
    UserId    string form:"user_id"
}

func MessageNotify(ctx *gin.Context) {
    // 获取参数
    var params MessageNotifyRequest
    if err := ctx.ShouldBindQuery(¶ms); err != nil {
        log.Println(err)
        return
    }
    // TODO: 鉴权

    // 将http升级为websocket
    conn, err := (&websocket.Upgrader{
        // 1. 解决跨域问题
        CheckOrigin: func(r *http.Request) bool {
            return true
        },
    }).Upgrade(ctx.Writer, ctx.Request, nil) // 升级
    if err != nil {
        log.Println(err)
        http.NotFound(ctx.Writer, ctx.Request)
        return
    }

    // 创建一个实例连接
    ConnId := xid.New().String()
    client := &websocket2.Client{
        ID:            ConnId, // 连接id
        AccountId:      fmt.Sprintf("%s", params.UserId),
        HeartbeatTime: time.Now().Unix(),
        Socket:        conn,
    }

    // 用户注册到用户连接管理
    websocket2.RegisterChan

用websocket做消息通知,对于后端来说,主要是绑定连接和管理连接,绑定连接就是用户id和websocket连接建立一个绑定关系,而管理连接就是存储连接,删除连接,维护连接的健康(心跳检测),其次就是定义服务端接收和发送数据的格式。总体大概就是这样一个思路。

Original: https://www.cnblogs.com/ourongxin/p/15925620.html
Author: EthanWell
Title: websocket:二.Golang实现Websocket消息通知

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516434/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 基于LSM的Key-Value数据库实现WAL篇

    上篇文章简单的实现了基于LSM数据库的初步版本,在该版本中如数据写入到内存表后但还未持久化到SSTable排序字符串表,此时正好程序崩溃,内存表中暂未持久化的数据将会丢失。 引入W…

    Go语言 2023年5月25日
    058
  • Golang通脉之指针

    指针的概念 指针是存储另一个变量的内存地址的变量。 变量是一种使用方便的占位符,用于引用计算机内存地址。 一个指针变量可以指向任何一个值的内存地址。 在上面的图中,变量b的值为15…

    Go语言 2023年5月25日
    062
  • Golang使用swaggo自动生成Restful API文档

    相信很多程序猿和我一样不喜欢写API文档。写代码多舒服,写文档不仅要花费大量的时间,有时候还不能做到面面具全。但API文档是必不可少的,相信其重要性就不用我说了,一份含糊的文档甚至…

    Go语言 2023年5月25日
    055
  • golang 标准库template的代码生成

    最近,随着 antd Pro v5 的升级,将项目进行了升级,现在生成的都是 ts 代码。这个项目的自动生成代码都是基于 golang 的标准库 template 的,所以这篇博客…

    Go语言 2023年5月25日
    042
  • Go语言之反射

    一、反射的基本概念 (一)什么是反射 反射可以再运行时动态获取变量的各种信息,比如变量的类型、值等 如果时结构体变量,还可以获取到结构体本身的各种信息,比如结构体的字段、方法 通过…

    Go语言 2023年5月29日
    073
  • beego下让swagger按照更新了controllers的接口信息自动更新commentsRouter_controllers.go

    beego下让swagger按照更新了controllers的接口信息自动更新commentsRouter_controllers.go (1)在beego环境中,当更新了cont…

    Go语言 2023年5月25日
    038
  • Go语言之接口

    接口就是一系列方法的集合(规范行为) 在面向对象的领域里,接口一般这样定义:接口定义一个对象的行为,规范子类对象的行为。 在 Go 语言中的接口是非侵入式接口(接口没了,不影响代码…

    Go语言 2023年5月25日
    057
  • 【CGO】C源码编译为动态库供go程序调用(linux环境、arm架构运行平台)

    动态库编译 1.安装并配置交叉编译工具链网上有详细教程 2.go env环境配置 go env -w CGO_ENABLED=1 go env -w GOOS=linux go e…

    Go语言 2023年5月25日
    052
  • golang tcp keepalive研究记录(基于websocket)

    服务器和客户端建立tcp连接以后,客户端/服务器如何知道对方是否挂掉了? 这时候TCP协议提出一个办法,当客户端端等待超过一定时间后自动给服务端发送一个空的报文,如果对方回复了这个…

    Go语言 2023年5月25日
    046
  • Go语言之Goroutine与信道、异常处理

    一、Goroutine Go 协程可以看做成一个轻量级的线程,Go 协程相比于线程的优势: Goroutine 的成本更低大小只有 2 kb 左右,线程有几个兆。 Goroutin…

    Go语言 2023年5月25日
    057
  • Go编译过程

    一、 Go编译流程 二、过程说明 词法解析 读取Go源文件,将字符序列转换为符号(token)序列,比如将”:=”转换为_Define 代码中的标识符、关键…

    Go语言 2023年5月25日
    059
  • Golang的GC回收机制

    GC触发的条件 v1.3版本 标记清除法 第一步,找出不可达的对象,做上标记。 第二部,回收没有被标记的对象。 缺点:在标记的时候会进行STW(Stop the world) St…

    Go语言 2023年5月25日
    049
  • Sentinel-Go 源码系列(三)滑动时间窗口算法的工程实现

    要说现在工程师最重要的能力,我觉得工程能力要排第一。 就算现在大厂面试经常要手撕算法,也是更偏向考查代码工程实现的能力,之前在群里看到这样的图片,就觉得很离谱。 算法与工程实现 在…

    Go语言 2023年5月25日
    051
  • Go – 如何编写 ProtoBuf 插件(二)?

    上篇文章《Go – 如何编写 ProtoBuf 插件 (一) 》,分享了使用 proto3 的 &#x81EA;&#x5B9A;&#x4E49;…

    Go语言 2023年5月25日
    054
  • golang低级编程:一.unsafe包

    go语言在设计上确保了一些安全的属性,限制了程序可能出错的途径。例如严格的类型转换规则。但也使得很多实现的细节无法通过go程序来访问,例如对于聚合类型(如结构体)的内存布局,或者一…

    Go语言 2023年5月25日
    043
  • 为什么要避免在 Go 中使用 ioutil.ReadAll?

    原文链接: 为什么要避免在 Go 中使用 ioutil.ReadAll? ioutil.ReadAll 主要的作用是从一个 io.Reader 中读取所有数据,直到结尾。 在 Gi…

    Go语言 2023年5月25日
    049
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球