我们在设计产品的时候通常都会遇到消息通知的时候,比如用户下单支付成功,比如用户有站内信来可以实时通知。而http是单向的,客户端请求,服务端返回,这次请求就已经结束。而websocket可以保持连接实现长连接,在遇到通知时往往使用websocket来达到服务端主动向客户端发送消息的目的。
我们的目标是实现服务端主动向某个用户发消息。所以要做到一下四步。
我们这里使用到 github.com/gorilla/websocket 包。
首先是定义一个客户端连接的结构,先有了连接的结构才能保存连接,ID是一个客户端连接的id,而Socket是真正的客户端连接
// 客户端连接信息
type Client struct {
ID string // 连接ID
AccountId string // 账号id, 一个账号可能有多个连接
Socket *websocket.Conn // 连接
HeartbeatTime int64 // 前一次心跳时间
}
然后定义一个客户端管理,来管理所有的客户端连接,并且实例化为一个全局的变量。
// 消息类型
const (
MessageTypeHeartbeat = "heartbeat" // 心跳
MessageTypeRegister = "register" // 注册
HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
HeartbeatTime = 20 // 心跳距离上一次的最大时间
ChanBufferRegister = 100 // 注册chan缓冲
ChanBufferUnregister = 100 // 注销chan大小
)
// 客户端管理
type ClientManager struct {
Clients map[string]*Client // 保存连接
Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接
mu *sync.Mutex
}
// 定义一个管理Manager
var Manager = ClientManager{
Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
Accounts: make(map[string][]string), // 账号和连接关系
mu: new(sync.Mutex),
}
var (
RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
)
这里还要封装一下服务器给客户端发消息的格式,以便客户端连接成功后服务端给客户端回复消息
// 封装回复消息
type ServiceMessage struct {
Type string json:"type"
// 类型
Content ServiceMessageContent json:"content"
}
type ServiceMessageContent struct {
Body string json:"body"
// 主要数据
MetaData string json:"meta_data"
// 扩展数据
}
func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
replyMsg := ServiceMessage{
Type: t,
Content: content,
}
msg, _ := json.Marshal(replyMsg)
return msg
}
管理连接
连接保持在Manager里的Clients,和Accounts。Clients用于保存每个与客户端通信的连接。而Account保持者连接id与连接分类(Category)的绑定关系。
// 注册注销
func register() {
for {
select {
case conn := <-registerchan: 新注册,新连接 加入连接,进行管理 accountbind(conn) 回复消息 content :="CreateReplyMsg(MessageTypeRegister," servicemessagecontent{}) _="conn.Socket.WriteMessage(websocket.TextMessage," content) case conn 注销,或者没有心跳 关闭连接 删除client unaccountbind(conn) } 绑定账号 func accountbind(c *client) { manager.mu.lock() defer manager.mu.unlock() 加入到连接 manager.clients[c.id]="c" 加入到绑定 if _, ok 该账号已经有绑定,就追加一个绑定 manager.accounts[c.accountid]="append(Manager.Accounts[c.AccountId]," c.id) else 没有就新增一个账号的绑定切片 解绑账号 unaccountbind(c 取消连接 delete(manager.clients, 取消绑定 len(manager.accounts[c.accountid])> 0 {
for k, clientId := range Manager.Accounts[c.AccountId] {
if clientId == c.ID { // 找到绑定客户端Id
Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId][:k], Manager.Accounts[c.AccountId][k+1:]...)
}
}
}
}
</-registerchan:>
每隔一段时间,就检测一次心跳,如果上次心跳时间超过了HeartbeatTime时间视为已经断开连接。
// 维持心跳
func heartbeat() {
for {
// 获取所有的Clients
Manager.mu.Lock()
clients := make([]*Client, len(Manager.Clients))
for _, c := range Manager.Clients {
clients = append(clients, c)
}
Manager.mu.Unlock()
for _, c := range clients {
if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
unAccountBind(c)
}
}
time.Sleep(time.Second * HeartbeatCheckTime)
}
}
// 管理连接
func Start() {
// 检查心跳
go func() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
}
}()
heartbeat()
}()
// 注册注销
go func() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
}
}()
register()
}()
}
收发消息
// 根据账号获取连接
func GetClient (accountId string) []*Client{
clients := make([]*Client,0)
Manager.mu.Lock()
defer Manager.mu.Unlock()
if len(Manager.Accounts[accountId]) > 0 {
for _,clientId := range Manager.Accounts[accountId] {
if c,ok := Manager.Clients[clientId]; ok {
clients = append(clients,c)
}
}
}
return clients
}
我们这是只是心跳用到了,所以只要判断客户端是心跳消息,然后回复即可。
// 读取信息,即收到消息
func (c *Client) Read() {
defer func() {
_ = c.Socket.Close()
}()
for {
// 读取消息
_, body, err := c.Socket.ReadMessage()
if err != nil {
break
}
var msg struct {
Type string json:"type"
}
err = json.Unmarshal(body, &msg)
if err != nil {
log.Println(err)
continue
}
if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
// 刷新连接时间
c.HeartbeatTime = time.Now().Unix()
// 回复心跳
replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
if err != nil {
log.Println(err)
}
continue
}
}
}
只要找到连接,对连接发送消息即可。
// 发送消息
func Send(accounts []string,message ServiceMessage) error{
msg,err := json.Marshal(message)
if err != nil {
return err
}
for _,accountId := range accounts{
// 获取连接id
clients := GetClient(accountId)
// 发送消息
for _,c := range clients {
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
}
}
return nil
}
这里对http请求升级为websocket,然后单独建立一个goroutine去维持连接。下面类似这样调用,但是鉴权,日志等很多细节不完善,只是提供一个思路。
package wesocket
import (
websocket2 "demo/websocket"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/xid"
"log"
"net/http"
"time"
)
type MessageNotifyRequest struct {
UserId string form:"user_id"
}
func MessageNotify(ctx *gin.Context) {
// 获取参数
var params MessageNotifyRequest
if err := ctx.ShouldBindQuery(¶ms); err != nil {
log.Println(err)
return
}
// TODO: 鉴权
// 将http升级为websocket
conn, err := (&websocket.Upgrader{
// 1. 解决跨域问题
CheckOrigin: func(r *http.Request) bool {
return true
},
}).Upgrade(ctx.Writer, ctx.Request, nil) // 升级
if err != nil {
log.Println(err)
http.NotFound(ctx.Writer, ctx.Request)
return
}
// 创建一个实例连接
ConnId := xid.New().String()
client := &websocket2.Client{
ID: ConnId, // 连接id
AccountId: fmt.Sprintf("%s", params.UserId),
HeartbeatTime: time.Now().Unix(),
Socket: conn,
}
// 用户注册到用户连接管理
websocket2.RegisterChan
用websocket做消息通知,对于后端来说,主要是绑定连接和管理连接,绑定连接就是用户id和websocket连接建立一个绑定关系,而管理连接就是存储连接,删除连接,维护连接的健康(心跳检测),其次就是定义服务端接收和发送数据的格式。总体大概就是这样一个思路。
Original: https://www.cnblogs.com/ourongxin/p/15925620.html
Author: EthanWell
Title: websocket:二.Golang实现Websocket消息通知
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516434/
转载文章受原作者版权保护。转载请注明原作者出处!