灵感来袭,基于Redis的分布式延迟队列

延迟队列

延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。

Redisson Delayed Queue

如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

灵感来袭,基于Redis的分布式延迟队列

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的 RDelayedQueue Java对象在实现了 RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue distinationQueue = ...

RDelayedQueue delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

Java DelayQueue

DelayQueue它本质上是一个队列,而这个队列里也只有存放Delayed的子类才有意义。

灵感来袭,基于Redis的分布式延迟队列

延迟队列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }

    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延迟队列消费原理

灵感来袭,基于Redis的分布式延迟队列

源码中出现了三次await字眼:

  • 第一次是当队列为空时,等待;
  • 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader),那不好意思,还得接续等待直到下一个可执行的任务。
  • 第三次是真正延时的地方了,available.awaitNanos(delay),此时也没有别的线程要执行,也就是我将要执行,等待剩下的延迟时间即可。

延迟队列生产原理

灵感来袭,基于Redis的分布式延迟队列

为保证消费者正常消费,如果优先队列头元素和当前放入元素相等,则说明当前元素消费的优先级高,重置准备消费的线程(leader)为null,唤醒消费者线程重新执行take方法逻辑。

手写一个Redis延迟队列

Redis延迟队列设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体Message实现了Delayed接口,这样Java DelayQueue就知道什么时候取出消息体。

Redis延迟队列实现

灵感来袭,基于Redis的分布式延迟队列

RedisDelayQueue构造函数依赖 redis操作缓存服务对象目标队列名称(redis key)。

offer方法传入member(具体消息),delay(延迟时间),timeUnit(时间单位),然后封装成延迟消息体Message对象,放入Java DelayQueue中。

run方法是一个循环体,不断的从Java DelayQueue对象中获取消息体,然后放入redis对应的目标队列里。

延迟队列测试demo

灵感来袭,基于Redis的分布式延迟队列

控制台打印效果

灵感来袭,基于Redis的分布式延迟队列

思考

这种方案实现比较简单,使用的时候一定要谨慎,应用于延迟小,消息量不大的场景是没问题的,毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列【灵感来袭,基于Redis的分布式延迟队列(续)】,使用TimeStamp作为score,比如你的任务是要延迟5分钟,那么就在当前时间上加5分钟作为 score ,轮询任务每秒只轮询 score 小于等于 当前时间的 key即可,如果任务支持有误差,那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。

Original: https://www.cnblogs.com/hujunzheng/p/12587572.html
Author: 胡峻峥
Title: 灵感来袭,基于Redis的分布式延迟队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/528948/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • zenity,把shell加上图形界面

    有时自己写点小工具自己用,比较喜欢非图形界面的,可是有些应用还是弹出个东西来提醒一下,才能让你注意。 zenity是个很帅的东西,它用法可以参考它的–help ,这里不…

    Linux 2023年5月28日
    085
  • HTS恢复检查脚本

    #!/bin/bash #program:HTS-A数据库和插件检查 #author:sundz #version 20220531 v1 创建脚本 生成sql的表和字段汇总;ab…

    Linux 2023年6月7日
    091
  • redis高级

    1 redis高可用 主从复制存在的问题: 1 主从复制,主节点发生故障,需要做故障转移,可以手动转移:让其中一个slave变成master—>哨兵 2 主从复制,只能主…

    Linux 2023年6月14日
    087
  • 基于灰色模型和Bootstrap理论的大规模定制质量控制方法研究

    基于GM的生产质量预测: 原始质量指标数列为: 是的累加序列为: 经过该处理,可以使粗糙的原始离散数列变为光滑的离散数列。 建立基本的预测模型GM(1,1),其白化方程为 式中,a…

    Linux 2023年6月14日
    069
  • PHP array_count_values()

    array_count_values array_count_values() 函数用于统计数组中所有值出现的次数。 本函数返回一个数组,其元素的键名是原数组的值,键值是该值在原数…

    Linux 2023年6月7日
    090
  • Kubernetes服务发现之Service详解

    一、引子 Kubernetes Pod 是有生命周期的,它们可以被创建,也可以被销毁,然后一旦被销毁生命就永远结束。通过 ReplicationController 能够动态地创建…

    Linux 2023年6月14日
    091
  • Redis多线程原理详解

    从上图中可以看出只有以下3个地方用的是多线程,其他地方都是单线程: 1:接收请求参数 2:解析请求参数 3:请求响应,即将结果返回给client 很明显以上3点各个请求都是互相独立…

    Linux 2023年5月28日
    078
  • [20211215]提示precompute_subquery补充.txt

    [20211215]提示precompute_subquery补充.txt –//前几天测试precompute_subquery,我仔细想一下好像以前看书或者别人的b…

    Linux 2023年6月13日
    068
  • SQL51 查找字符串中逗号出现的次数

    本题链接本题表结构如下所示。 +—-+————–+ | id | string | +—-+————–+ | 1 | 10,A,B | …

    Linux 2023年6月13日
    091
  • Xshell配置ssh免密码登录-密钥公钥(Public key)与私钥(Private Key)登录【已成功实例】

    本文转自https://blog.csdn.net/qjc_501165091/article/details/51278696 ssh登录提供两种认证方式:口令(密码)认证方式和…

    Linux 2023年5月28日
    084
  • ShardingSphere-proxy-5.0.0容量范围分片的实现(五)

    一、修改配置文件config-sharding.yaml,并重启服务 # Licensed to the Apache Software Foundation (ASF) unde…

    Linux 2023年6月14日
    0138
  • 【5】2022年7月

    7月3日 总结3号的一天就是只有一句话,”自己经历了什么只有自己最清楚,不要辜负自己经历的”。 7月3号凌晨2点,收拾好行李和整理房间,在网上购买日常用品到…

    Linux 2023年6月13日
    077
  • linux下man 指令衍生代号

    进入man指令的功能后,你可以按下『空格键』往下翻页,可以按下『q 』按键来离开man的环境。 上表中的1, 5, 8这三个号码特别重要,也请读者要将这三个数字所代表的意义背下来 …

    Linux 2023年6月8日
    0110
  • zabbix用户,角色,权限,模板管理

    zabbix用户,角色,权限,模板管理 用户组 用户角色 用户 使用刚才创建的用户登录 模板组 模板 模板的监控项可以自己创建也可以从其他模板复制 posted @2022-09-…

    Linux 2023年6月13日
    0108
  • Linux文本处理相关命令

    一、文本处理命令 Linux sort命令用于将文本文件内容加以排序。 sort 可针对文本文件的内容,以行为单位来排序。 语法格式如下: sort [参数]…[文件] 相关参…

    Linux 2023年5月27日
    091
  • CANoe的安装和使用

    CANoe的简介 CANoe是德国Vector公司为汽车总线的开发而设计的一款总线开发环境,全称叫CAN open environment。CANoe集合了网络监控、数据获取/记录…

    Linux 2023年6月13日
    0200
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球