灵感来袭,基于Redis的分布式延迟队列



延迟队列

延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。

Redisson Delayed Queue

如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

灵感来袭,基于Redis的分布式延迟队列

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的 RDelayedQueue Java对象在实现了 RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue distinationQueue = ...

RDelayedQueue delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

Java DelayQueue

DelayQueue它本质上是一个队列,而这个队列里也只有存放Delayed的子类才有意义。

灵感来袭,基于Redis的分布式延迟队列

延迟队列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }

    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延迟队列消费原理

灵感来袭,基于Redis的分布式延迟队列

源码中出现了三次await字眼:

  • 第一次是当队列为空时,等待;
  • 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader),那不好意思,还得接续等待直到下一个可执行的任务。
  • 第三次是真正延时的地方了,available.awaitNanos(delay),此时也没有别的线程要执行,也就是我将要执行,等待剩下的延迟时间即可。

延迟队列生产原理

灵感来袭,基于Redis的分布式延迟队列

为保证消费者正常消费,如果优先队列头元素和当前放入元素相等,则说明当前元素消费的优先级高,重置准备消费的线程(leader)为null,唤醒消费者线程重新执行take方法逻辑。

手写一个Redis延迟队列

Redis延迟队列设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体Message实现了Delayed接口,这样Java DelayQueue就知道什么时候取出消息体。

Redis延迟队列实现

灵感来袭,基于Redis的分布式延迟队列

RedisDelayQueue构造函数依赖 redis操作缓存服务对象目标队列名称(redis key)。

offer方法传入member(具体消息),delay(延迟时间),timeUnit(时间单位),然后封装成延迟消息体Message对象,放入Java DelayQueue中。

run方法是一个循环体,不断的从Java DelayQueue对象中获取消息体,然后放入redis对应的目标队列里。

延迟队列测试demo

灵感来袭,基于Redis的分布式延迟队列

控制台打印效果

灵感来袭,基于Redis的分布式延迟队列

思考

这种方案实现比较简单,使用的时候一定要谨慎,应用于延迟小,消息量不大的场景是没问题的,毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列【灵感来袭,基于Redis的分布式延迟队列(续)】,使用TimeStamp作为score,比如你的任务是要延迟5分钟,那么就在当前时间上加5分钟作为 score ,轮询任务每秒只轮询 score 小于等于 当前时间的 key即可,如果任务支持有误差,那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。

Original: https://www.cnblogs.com/hujunzheng/p/12587572.html
Author: 胡峻峥
Title: 灵感来袭,基于Redis的分布式延迟队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/528948/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Linux内核–异常和中断的区别

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月24日
    0270
  • lambda跨账号调用elasticache redis调查结果

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年9月14日
    0176
  • pyQt中的信号

    1. 说明 在调用 exec_()方法时,应用会进入主循环,而主循环会监听、处理事件 import sys from PyQt5.QtCore import Qt from PyQ…

    Linux 2023年6月7日
    020
  • bochs(2.6.11)配置安装

    下载:https://bochs.sourceforge.io/ 建议下载2.6.11,下文一开始安装的2.7,但运行时有无法解决的错误。但是大致安装过程一致。 linux 提前安…

    Linux 2023年5月27日
    048
  • c++ 11 是如何简化你的数据库访问接口的

    之前写过一篇文章专门分析了 c++ 模板编译过程中报的一个错误:《fatal error C1045: 编译器限制 : 链接规范嵌套太深 》,其中涉及到了 qtl —— 一个使用 …

    Linux 2023年6月6日
    027
  • docker:alpine使用logrotate切割日志

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年11月7日
    0133
  • 武装你的WEBAPI-OData常见问题

    本文属于OData系列 Intro 非常喜欢OData,在各种新项目中都使用了这个技术。对于.NET 5.0, OData推出了8.0preview,于是就试用了一下。发现坑还是非…

    Linux 2023年6月6日
    020
  • WPF 多线程下跨线程处理 ObservableCollection 数据

    本文告诉大家几个不同的方法在 WPF 里,使用多线程修改或创建 ObservableCollection 列表的数据 需要明确的是 WPF 框架下,非 UI 线程直接或间接访问 U…

    Linux 2023年6月6日
    024
  • Redis6.0.5版本配置文件说明(转载)

    Redis版本此文章中Redis版本为6.0.5。 redis-server –versionRedis server v=6.0.5 sha=00000000:0 m…

    Linux 2023年5月28日
    023
  • 最简单的,在win,linux中,用powershell,自动获取Let’s Encrypt证书方法

    powershell传教士原创 2020-04-12,2022-05更新 Let’s Encrypt证书有效期3个月,支持泛域名【*.你的网站.net】。支持n天内(一…

    Linux 2023年6月14日
    025
  • ubuntu下sublime中文无法输入的问题

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月26日
    0221
  • 部署apache

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年11月5日
    0170
  • golang之闭包

    注入产生的原理: 数据库设置为GBK编码: 宽字节注入源于程序员设置MySQL连接时错误配置为:set character_set_client=gbk,这样配置会引发编码转换从而…

    Linux 2022年8月11日
    0220
  • 【微信篇】电脑版微信的照片视频文件位置变化

    新版的微信视频图片更新了位置,感觉有好有坏吧,好的方面就是以后查找视频、图片、文档等可能更方便;不好就是越更新占用体积越大,还多很多数据,不懂是否流氓?!—【蘇小沐】 …

    Linux 2023年6月13日
    031
  • 拓扑排序

    拓扑排序 简介 拓扑排序是将偏序的数据线性化的一种排序方法。复习下偏序和全序的概念: 全序关系是偏序关系的一个子集。 全序是集合内任何一对元素都是可比较的,比如数轴上的点都具有一个…

    Linux 2023年6月13日
    034
  • shell中的段落注释

    摘自: 感叹号可以用任意的串和字符替代,比如 注意!和BLOCK之前不能有空格,但是可以用Tab Original: https://www.cnblogs.com/LiuYanY…

    Linux 2023年5月28日
    024
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球