玩转redis-延时消息队列

上一篇基于 redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有 X功能,还要 Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有 延时消息队列才能算是一个完整的消息队列。

看看 redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个 延时消息队列

玩转redis-延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个 double类型的分数。 redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把 score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个 valuescore保存的是时间,也就是说,在添加一个元素时他的 score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是 ZRANGEBYSCORElistpop不同, pop是取出元素并且会把元素在 list中删除。 ZRANGEBYSCORE只会取出数据不会把数据从 sorted set中删除。解决方法1,利用 redis事务,先 ZRANGEBYSCORE取出数据,然后再用 ZREMRANGEBYSCORE 把数据删除。

玩转redis-延时消息队列

具体实现-code

添加延时消息,参数 delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    if delay <= 0 { return errors.new("delay need great than zero") } tm :="time.Now().Add(delay)" msg body) msg.delaytime="tm.Unix()" senddata, _ p.rediscmd.zadd(topicname+zsetsuffix, redis.z{score: float64(tm.unix()), member: string(senddata)}).err() < code></=>

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个 ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get delay message.")
            ticker.Stop()
        }()
        topicName := s.topicName + zsetSuffix
        for {
            currentTime := time.Now().Unix()
            select {
            case <-s.ctx.done(): log.printf("context done msg: %#v \n", s.ctx.err()) return case <-ticker.c: var valuescmd *redis.zslicecmd _, err :="s.redisCmd.TxPipelined(func(pip" redis.pipeliner) error { 0, currenttime) pip.zremrangebyscore(topicname, "0", strconv.formatint(currenttime, 10)) nil }) if !="nil" log.printf("zset pip error: err) continue } rev for revbody msg json.unmarshal([]byte(revbody.member.(string)), msg) s.handler s.handler.handlemessage(msg) }() < code></-s.ctx.done():>

Original: https://www.cnblogs.com/li-peng/p/12697110.html
Author: li-peng
Title: 玩转redis-延时消息队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529058/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Visual Studio远程调试

    在工作中有时候需要在服务器的环境下远程调试,例如调试微信相关服务的时候。 1. 下载远程调试工具 网址:https://visualstudio.microsoft.com/zh-…

    Linux 2023年6月7日
    0116
  • jenkins使用shell脚本执行nohup java -jar包失败

    一、问题 通过jenkins执行shell脚本时,脚本中是通过nohup java -jar &的方式启动,显示执行成功,但是服务却没启动,脚本如下: #! /bin/ba…

    Linux 2023年5月28日
    0179
  • Hadoop 调优

    Hadoop 调优 HDFS 调优 hdfs-site.xml 1. hadoop 文件块大小,通常为 128MB 或 256MB dfs.block.size 134217728…

    Linux 2023年6月8日
    0109
  • PyTorch介绍-使用 TORCH.AUTOGRAD 自动微分

    训练神经网络时,最常用的算法就是 反向传播。在该算法中,参数(模型权重)会根据损失函数关于对应参数的梯度进行调整。 为了计算这些梯度,PyTorch内置了名为 torch.auto…

    Linux 2023年6月14日
    0125
  • USB转RS232串口应用

    RS232串口是用于数据串行通信传输的标准之一,该标准定义了信号的电气特性和时序、信号的含义以及连接器的物理尺寸和引脚排列。RS232协议规范定义的是DB25接口,实际上大多数RS…

    Linux 2023年6月7日
    0125
  • MySQL SUBSTRING_INDEX截取字符串

    一、SUBSTRING_INDEX 二、示例 Original: https://www.cnblogs.com/woods1815/p/16368248.htmlAuthor: …

    Linux 2023年6月13日
    084
  • 手把手教你在Linux系统下安装MySQL

    在CentOS中默认安装有MariaDB,这个是MySQL的分支,但为了需要,还是要在系统中安装MySQL,而且安装完成之后可以直接覆盖掉MariaDB。 1. 下载并安装MySQ…

    Linux 2023年6月14日
    0110
  • WPF 开源二维绘画小工具 GeometryToolDemo 项目

    这是一个演示 WPF 进行二维绘画的小工具 Demo 项目,基于 MIT 协议在 GitHub 上完全开源 这是一个演示 WPF 进行二维绘画的小工具 Demo 项目,基于 MIT…

    Linux 2023年6月6日
    0116
  • 接口测试

    :配置windows中特定应用的抓包(默认抓取不到) :添加备注信息 :重新发起指定请求 :清空指定会话内容 :断点放行 :模式切换 :相应数据解码 :抓取指定进程发出的请求 :关…

    Linux 2023年6月7日
    082
  • 实验

    编写程序实现以下功能 编写程序,打印99乘法表 将一面额为10元倍数的整钱( 输入一行字符,统计其中单词的个数。各单词之间用空格分隔,空格数可以是多个。 输入输出示例 Input …

    Linux 2023年6月7日
    0102
  • Java基础封装类型的缓存

    Java笔试常见题型 类型 缓存范围 Byte -128-127 Short -128-127 Integer -128-127 Long -128-127 Character 0…

    Linux 2023年6月7日
    0117
  • linux配置yum源的三种方法

    镜像下载、域名解析、时间同步请点击阿里云开源镜像站 linux配置yum源的三种方法: 1.配置网络yum源 2.通过上传镜像文件配置本地yum源 3.通过连接存储或本地镜像文件配…

    Linux 2023年5月27日
    095
  • Linux关于防火墙的命令

    Linux关于防火墙的命令 一、red hat/CentOs7关闭防火墙的命令 查看防火墙状态 systemctl status firewalld service iptable…

    Linux 2023年6月11日
    0106
  • urandom和random区别

    linux中提供了 /dev/urandom 和 /dev/random 两个特殊设备来提供随机数。那么这两个文件有什么区别呢?要回答这个问题,先需要了解熵这个概念。 熵linux…

    Linux 2023年6月7日
    091
  • histogram的类型详解

    采样点 每隔指定的时间会采集并上报一次数据,称为采样点。 请注意这里采集的是当前瞬间的数据 count 对采样点的 次数累计和(count) bucket 对采样点的 次数进行统计…

    Linux 2023年6月13日
    088
  • 生成随机数的若干种方法

    背景: 创建账户时我们需要配置初始随机密码,使用手机号注册时需要随机验证码,抽奖活动需要随机点名,俄罗斯方块游戏需要随机出形状。这些案例都在说明一个问题,随机数据很重要!而在 Sh…

    Linux 2023年6月6日
    099
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球