玩转redis-延时消息队列

上一篇基于 redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有 X功能,还要 Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有 延时消息队列才能算是一个完整的消息队列。

看看 redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个 延时消息队列

玩转redis-延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个 double类型的分数。 redis正是通过分数来为集合中的成员进行从小到大的排序。
有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把 score打出来

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个 valuescore保存的是时间,也就是说,在添加一个元素时他的 score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是 ZRANGEBYSCORElistpop不同, pop是取出元素并且会把元素在 list中删除。 ZRANGEBYSCORE只会取出数据不会把数据从 sorted set中删除。解决方法1,利用 redis事务,先 ZRANGEBYSCORE取出数据,然后再用 ZREMRANGEBYSCORE 把数据删除。

玩转redis-延时消息队列

具体实现-code

添加延时消息,参数 delay就是我们要延时多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
    if delay <= 0 { return errors.new("delay need great than zero") } tm :="time.Now().Add(delay)" msg body) msg.delaytime="tm.Unix()" senddata, _ p.rediscmd.zadd(topicname+zsetsuffix, redis.z{score: float64(tm.unix()), member: string(senddata)}).err() < code></=>

使用,比如我们想过1秒再处理

producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理
这就比较简单了,就是在一个 ticker里循环读取小于或等于当前时间的数据:

func (s *consumer) startGetDelayMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get delay message.")
            ticker.Stop()
        }()
        topicName := s.topicName + zsetSuffix
        for {
            currentTime := time.Now().Unix()
            select {
            case <-s.ctx.done(): log.printf("context done msg: %#v \n", s.ctx.err()) return case <-ticker.c: var valuescmd *redis.zslicecmd _, err :="s.redisCmd.TxPipelined(func(pip" redis.pipeliner) error { 0, currenttime) pip.zremrangebyscore(topicname, "0", strconv.formatint(currenttime, 10)) nil }) if !="nil" log.printf("zset pip error: err) continue } rev for revbody msg json.unmarshal([]byte(revbody.member.(string)), msg) s.handler s.handler.handlemessage(msg) }() < code></-s.ctx.done():>

Original: https://www.cnblogs.com/li-peng/p/12697110.html
Author: li-peng
Title: 玩转redis-延时消息队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529058/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • podman的基本用法

    podman的基本设置和使用 运行示例容器 列出正在运行的容器 检查正在运行的容器 测试 httpd 服务器 查看容器的日志 查看容器的 pid 检查点容器 恢复容器 迁移容器 停…

    Linux 2023年6月13日
    088
  • 电脑中图标变白色教你怎么修复

    复制一下代码到文本文档中 另存为 .bat 然后点击好的配置文件右键以管理员身份运行 就会解决桌面变白的问题 @echo off taskkill /f /im explorer….

    Linux 2023年6月7日
    0104
  • 真正在大厂干了几年,我学会了反内卷[转]

    内卷这个概念的内涵很丰富,与我们的生活息息相关。为了普及和传播知识,我参考了相关的信息,把我个人的粗浅理解奉献给朋友们。 什么是内卷? 内卷 involution,与之对应的是 e…

    Linux 2023年6月8日
    0125
  • JVM学习 类加载子系统

    JVM 哔哩哔哩 尚硅谷视频 宋红康老师 Java代码执行流程 简图 详细图 1、类加载子系统 类加载器子系统的作用 类加载器子系统负责从文件系统或者网络中加载Class文件,cl…

    Linux 2023年6月7日
    0113
  • JavaScript this

    本博客所有文章仅用于学习、研究和交流目的,欢迎非商业性质转载。 博主的文章没有高度、深度和广度,只是凑字数。由于博主的水平不高,不足和错误之处在所难免,希望大家能够批评指出。 博主…

    Linux 2023年6月13日
    0111
  • Django基础学习笔记

    创建一个django项目:命令: django-admin startproject 项目名 进入到项目并创建一个应用:命令: python manage.py startapp …

    Linux 2023年6月6日
    0104
  • Abp vNext中集成Redis

    在Abp vNext中默认集成了缓存,可以使用.net Core自带的缓存,也可以使用Redis,但官网的Redis集成中少掉了一个依赖项的介绍。 首先,正常使用Abp vNext…

    Linux 2023年5月28日
    096
  • Java 的JAR包、EAR包、WAR包区别

    WAR(Web Archive file) 网络应用程序文件,是与平台无关的文件格式,它允许将许多文件组合成一个压缩文件。WAR专用于Web方面。大部分的JAVA WEB工程,都是…

    Linux 2023年6月14日
    0112
  • Windows Server 新增磁盘处于脱机状态解决办法

    解决方案: Cmd命令行操作如下: 1,进入diskpart模式 2、列出磁盘情况 3、选择脱机的磁盘 4、联机磁盘 5、清除磁盘属性 6、进入磁盘管理,提示初始化 每天记录一点,…

    Linux 2023年6月8日
    0116
  • 消费税

    1994年税制改革时,我国才设置了独立的消费税,与实行普遍征收的增值税配套,对特定消费品进行特殊调节。 消费税的特点: (一)征税范围具有选择性 有选择地确定若干个征税项目,在税法…

    Linux 2023年6月14日
    0117
  • 在海思芯片上使用GDB远程调试

    使用海思平台上(编译工具链:arm-himix200-linux)交叉编译 GDB 工具(使用版本8.2,之前用过10.2的版本,在编译 gdbserver 遇到编译出错的问题,因…

    Linux 2023年6月7日
    0153
  • shell 获取变量是什么数据类型

    bash;gutter:true; function check(){ local a="$1" printf "%d" "$a&…

    Linux 2023年5月28日
    094
  • linux学习记录

    查看所有系统服务 systemctl list-unit-files –type service -all 查看服务状态 sudo systemctl status servic…

    Linux 2023年6月7日
    086
  • 【C++基础】内存分区模型

    内存分区模型 C++程序在执行时,将内存大致划分为 4个区域 代码区:存放函数体的二进制代码,由操作系统进行管理 全局区:存放 全局变量和 静态变量以及 常量 栈区:由 编译器自动…

    Linux 2023年6月13日
    090
  • 大数据之Hadoop的HDFS存储优化—异构存储(冷热数据分离)

    异构存储主要解决,不同的数据,储存在不同类型的硬盘中,达到最佳性能的问题 1)存储类型 RAM_DISK:内存镜像文件系统 SSD:SSD固态硬盘 DISK:普通磁盘,在HDFS中…

    Linux 2023年6月8日
    096
  • Python subprocess的使用

    前言 部门内部存在大量代码使用Python去调用Shell或者JS脚本,因此重度依赖subprocess(使用Google的subprocess32),在使用subprocess的…

    Linux 2023年6月7日
    090
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球