玩转redis-延时消息队列

上一篇基于 redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有 X功能,还要 Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有 延时消息队列才能算是一个完整的消息队列。

看看 redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个 延时消息队列

玩转redis-延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个 double类型的分数。 redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把 score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个 valuescore保存的是时间,也就是说,在添加一个元素时他的 score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是 ZRANGEBYSCORElistpop不同, pop是取出元素并且会把元素在 list中删除。 ZRANGEBYSCORE只会取出数据不会把数据从 sorted set中删除。解决方法1,利用 redis事务,先 ZRANGEBYSCORE取出数据,然后再用 ZREMRANGEBYSCORE 把数据删除。

玩转redis-延时消息队列

具体实现-code

添加延时消息,参数 delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    if delay <= 0 { return errors.new("delay need great than zero") } tm :="time.Now().Add(delay)" msg body) msg.delaytime="tm.Unix()" senddata, _ p.rediscmd.zadd(topicname+zsetsuffix, redis.z{score: float64(tm.unix()), member: string(senddata)}).err() < code></=>

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个 ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get delay message.")
            ticker.Stop()
        }()
        topicName := s.topicName + zsetSuffix
        for {
            currentTime := time.Now().Unix()
            select {
            case <-s.ctx.done(): log.printf("context done msg: %#v \n", s.ctx.err()) return case <-ticker.c: var valuescmd *redis.zslicecmd _, err :="s.redisCmd.TxPipelined(func(pip" redis.pipeliner) error { 0, currenttime) pip.zremrangebyscore(topicname, "0", strconv.formatint(currenttime, 10)) nil }) if !="nil" log.printf("zset pip error: err) continue } rev for revbody msg json.unmarshal([]byte(revbody.member.(string)), msg) s.handler s.handler.handlemessage(msg) }() < code></-s.ctx.done():>

Original: https://www.cnblogs.com/li-peng/p/12697110.html
Author: li-peng
Title: 玩转redis-延时消息队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529058/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • shell之磁盘容量检查,配合crontab可以定时清理磁盘

    我的做法: !/bin/bashAvailable=df -k | sed -n 2p | awk ‘{print $4}’if [ $Available -eq 0 ];then…

    Linux 2023年5月28日
    079
  • RISC-V汇编指南

    The RISC-V Assembly Programmer’s Manual is I think it’s probably better to bee…

    Linux 2023年6月6日
    069
  • Linux系统调用接口

    Linux系统调用接口 进程控制 系统调用 描述 fork 创建一个新进程 clone 按指定条件创建子进程 execve 运行可执行文件 exit 终止进程 _exit 立即终止…

    Linux 2023年6月13日
    0100
  • 脚本安装lamp

    脚本安装lamp [root@localhost ~]# mkdir lamp [root@localhost ~]# cd lamp/ [root@localhost lamp]…

    Linux 2023年6月6日
    0121
  • mycat数据库集群系列之mysql主从同步设置

    最近在梳理数据库集群的相关操作,现在花点时间整理一下关于mysql数据库集群的操作总结,恰好你又在看这一块,供一份参考。本次系列终结大概包括以下内容:多数据库安装、mycat部署安…

    Linux 2023年6月14日
    093
  • deepin安装Redis步骤以及简单配置

    一、安装Redis 安装完成之后,Redis服务器会自动启动 二、检查Redis服务器系统进程(非必要) 三、查看Redis端口状态(非必要) 四、输入redis-cli进入命令模…

    Linux 2023年5月28日
    0102
  • pwm驱动

    1.1、参考博客 参考的教程如下: 1.2、什么是PWM 脉冲宽度调制(PWM),是英文”Pulse Width Modulation”的缩写,简称脉宽调制…

    Linux 2023年6月13日
    0107
  • 万字干货|Java基础面试题(2022版)

    作者:小牛呼噜噜 | https://xiaoniuhululu.com计算机内功、JAVA底层、面试相关资料等更多精彩文章在公众号「小牛呼噜噜 」 概念常识 Java 语言有哪些…

    Linux 2023年6月6日
    0192
  • 安装完Ubuntu启动时自动进入grub命令行模式的解决办法

    1.先使用ls命令,找到Ubuntu的安装在哪个分区: grub>ls 会罗列所有的磁盘分区信息,比方说: (hd0,1),(hd0,5),(hd0,3),(hd0,2) 2…

    Linux 2023年6月13日
    089
  • Spring cloud gateway 如何在路由时进行负载均衡

    本文为博主原创,转载请注明出处: 1.spring cloud gateway 配置路由 在网关模块的配置文件中配置路由: 其中lb表示采用了负载均衡,user-server表示服…

    Linux 2023年6月14日
    086
  • BLACKTOAD 的模板 未完

    博客园 :当前访问的博文已被密码保护 请输入阅读密码: Original: https://www.cnblogs.com/Grharris/p/10876375.htmlAuth…

    Linux 2023年6月6日
    085
  • MongoDB安装使用教程

    MongoDB安装使用教程 介绍 MongoDB是一个基于分布式文件存储的数据库,是一个文档数据库,支持的数据结构非常松散,是类似json的bson格式,可以存储比较复杂的数据类型…

    Linux 2023年6月6日
    097
  • ToneGenerator Init failed Crash 崩溃

    需求需要在扫码时产生一个短促的提示音, 搜了下像这样实现。测试时发现多次扫码后,会触发程序崩溃问题。 异常如下 代码如下: 一番搜索, 以下为最佳答案, 加上以后,循环测试, 不再…

    Linux 2023年6月13日
    094
  • Centos8.x yum 源配置 解决 yum 不可用

    [root@iZ2ze1e3u7m7oe426pyndaa ~]# cd /etc/yum.repos.d/ [root@iZ2ze1e3u7m7oe426pyndaa yum.r…

    Linux 2023年6月7日
    0118
  • 三少玩Linux之LinuxMint, win7共存安装与简单配置

    先安装win7, 这里就不说了;再安装Mint, 这个是视频:https://www.bilibili.com/video/BV1AE411P7Cz; 这里关键就是LinuxMin…

    Linux 2023年6月14日
    0107
  • ASP.NET Core 2.2 : 二十. Action的多种数据返回格式处理机制

    上一章讲了系统如何将客户端提交的请求数据格式化处理成我们想要的格式并绑定到对应的参数,本章讲一下它的”逆过程”,如何将请求结果按照客户端想要的格式返回去。 …

    Linux 2023年6月7日
    0128
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球