ThinkPHP 使用 think-queue 实现 redis 消息队列

简单介绍:
消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。

大白话:
消息队列有两个角色和一个容器,角色分别为生产者(负责发布任务)和消费者(负责执行任务),容器这是用来存放/堆积生产者发布的任务,将发布和执行两个步骤分开且互不影响。

消息队列的大致流程为:
生产者发布任务存放/堆积在消息队列中,由消费者主动去消息队列中取出任务并执行,先发布的先执行(队列:先进先出),在没有消费者的情况下任务会堆积在队列中等待被取出执行。

优点:
消息队列适用于大并发或者处理时间长并需要批量操作的第三方接口,可用于但不仅限于短信发送、邮件发送、APP推送等,支持跨系统,即本系统发布的消息队列可以由自己或者给其他系统执行任务,同理本系统也可以作为消费者执行自己或者其他系统发布的消息队列任务。

接下来主要介绍一下 think-queue 的使用
ThinkPHP的Queue内置了 Redis、Database、Topthink、Sync四种驱动,这里使用的是 Redis,也推荐使用 Redis

think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作

消息队列基本配置
在 extra 目录,有些版本composer安装(在config)下创建 queue.php 配置文件

php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

生产者

创建一个测试类,写入生产者方法

php

namespace app\api\controller;

use think\Controller;
use think\Queue;

class Test extends Controller
{
    // 生产者,添加消息队列
    public function addQueue()
    {
        // 参数
        $data = [
            'id' => rand(0, 99),
            'userName' => '一起摸鱼'
        ];

        // 消息队列名
        $queueName = 'testQueue';

        // 推入消息队列,注意这里的 ::class 是PHP5.5才有的写法
        $isPushed = Queue::push(TestQueue::class, $data, $queueName);
        // PHP5.5以下的可以直接写命名空间
        // $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName);

        if ($isPushed !== false) {
            // 成功之后的业务
            echo '队列加入成功';
        } else {
            // 失败之后的业务
            echo '队列加入失败';
        }
    }
}

消费者

创建一个 TestQueue 类,用做消费者,执行消息队列中的任务

php

namespace app\common\queue;

use think\Log;
use think\queue\Job;

class TestQueue
{
    // 消费者执行入口
    public function fire(Job $job, $data)
    {
        // 具体执行业务
        $isJobDone = $this->doJob($data);

        if ($isJobDone) {
            // 消息队列执行成功,删除队列,否则会一直执行
            $job->delete();
        } else {
            // 消息队列执行失败
            // 获取消息队列已经重试了几遍
            $attempts = $job->attempts();
            if ($attempts == 0 || $attempts == 1) {
                // 重新发布,参数 delay 是延时发布的时间
                $job->release(2);
            }
        }
    }

    // 消息队列执行失败后会自动执行该方法
    public function failed($data)
    {
        Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data));
    }

    // 消息队列执行方法
    public function doJob($data)
    {
        // 具体执行业务

        $data = json_encode($data);
        echo '消息队列:' . $data;
        // 这里的判断条件以具体业务是否执行成功进行判断
        if ($data) {
            echo "执行成功";
            return true;
        } else {
            echo "执行失败";
            return false;
        }
    }
}

运行结果

请求接口,生产者发布任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

redis 队列存放任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

接下来就是启用队列的监听模式了,因为不可能每次一有任务加进来就去手动执行一次队列。队列的监听模式有两种,配置参数如下:

ThinkPHP 使用 think-queue 实现 redis 消息队列

项目根目录执行:

php think queue:work --queue 队列名

开启消费者,执行任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

redis 队列中的任务执行后也被删除 (redis桌面软件免费:Another Redis Desktop Manager)

ThinkPHP 使用 think-queue 实现 redis 消息队列

至于让队列后台运行,或添加守护进程,大家可以搜索相关知识

至此,整个消息队列流程就结束了。

转: https://blog.csdn.net/qq_44916915/article/details/124617854

Original: https://www.cnblogs.com/fps2tao/p/16399503.html
Author: 与f
Title: ThinkPHP 使用 think-queue 实现 redis 消息队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/528820/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 使用docker 部署mysql,突然连接不上!

    WARNING: IPv4 forwarding is disabled. Networking will not work. 大概意思就是说,网络不能用,也就意味着不能连网络,所…

    Linux 2023年6月7日
    081
  • Redis阻塞操作实现原理(转)

    原文:https://www.jianshu.com/p/xsMzfn 作者:Haiger 最近一位朋友问到:既然Redis是单线程的工作模式,那像 _BLPOP_这样的阻塞操作又…

    Linux 2023年5月28日
    0101
  • Teleport&Suspense

    vue3 新添加了一个默认的组件就叫 Teleport,我们可以拿过来直接使用,它上面有一个 to 的属性,它接受一个css query selector 作为参数,这就是代表要把…

    Linux 2023年6月13日
    0110
  • 022.常见硬盘检测方式

    硬盘监测概述 硬盘异常损坏日常相对概率较高,同时不同的文件系统(xfs,reiserfs,ext3)其检测方式不同。建议使用dmesag查看有没有硬件I/O故障的日志,也可使用用f…

    Linux 2023年6月7日
    090
  • Android进阶技术之——一文吃透Android的消息机制

    前言 为什么要老药换新汤 作为Android中 至关重要 的机制之一,十多年来,分析它的文章不断,大量的内容已经被挖掘过了。所以: 已经对这一机制熟稔于心的读者,在这篇文章中,看不…

    Linux 2023年6月13日
    0100
  • 聊聊消息中心的设计与实现逻辑

    厌烦被消息打扰,又怕突然间的安静; 一、业务背景 微服务的架构体系中,会存在很多基础服务,提供一些大部分服务都可能需要的能力,比如文件管理、MQ队列、缓存机制、消息中心等等,这些服…

    Linux 2023年6月14日
    091
  • SpringBoot 多环境配置文件切换

    背景 很多时候,我们项目在开发环境和生成环境的环境配置是不一样的,例如,数据库配置,在开发的时候,我们一般用测试数据库,而在生产环境的时候,我们是用正式的数据,这时候,我们可以利用…

    Linux 2023年6月14日
    0142
  • JuiceFS V1.0 RC1 发布,大幅优化 dump/load 命令性能, 深度用户不容错过

    各位社区的伙伴, JuiceFS v1.0 RC1 今天正式发布了! 这个版本中,最值得关注的是对元数据迁移备份工具 dump/load 的优化。 这个优化需求来自于某个社区重度用…

    Linux 2023年6月14日
    082
  • Redis16个常见使用场景

    目录 缓存 数据共享分布式 分布式锁 全局ID 计数器 限流 位统计 购物车 用户消息时间线timeline 消息队列 抽奖 点赞、签到、打卡 商品标签 商品筛选 用户关注、推荐模…

    Linux 2023年5月28日
    0111
  • Redis多线程原理详解

    从上图中可以看出只有以下3个地方用的是多线程,其他地方都是单线程: 1:接收请求参数 2:解析请求参数 3:请求响应,即将结果返回给client 很明显以上3点各个请求都是互相独立…

    Linux 2023年5月28日
    083
  • 华为ensp网络拓扑图使用MSTP、OSPF、DHCP、端口聚合以及PPP(CHAP认证)

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Linux 2023年6月7日
    0102
  • zabbix部署

    zabbix zabbix zabbix介绍 zabbix特点 zabbix部署 zabbix介绍 zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开…

    Linux 2023年6月13日
    0133
  • Shell脚本生成密码

    利用 /dev/urando 生成密码 密码以字母、数字、开头 特殊符号多 for _ in {1..30};do tr -dc ‘~`!@#$%^&*()_+-={}:&…

    Linux 2023年6月6日
    0111
  • 【实测】Python 和 C++ 下字符串查找的速度对比

    最近在备战一场算法竞赛,语言误选了 Python ,无奈只能着手对常见场景进行语言迁移。而字符串查找的场景在算法竞赛中时有出现。本文即对此场景在 Python 和竞赛常用语言 C+…

    Linux 2023年6月13日
    0116
  • RAID磁盘阵列技术

    RAID磁盘阵列技术 1、RAID概述 RAID(Redundant Array of Independent Disk),从字面意思讲的是基于独立磁盘的具有冗余的磁盘阵列,其核心…

    Linux 2023年6月7日
    0100
  • Netty源码解读(四)-读写数据

    读写Channel(READ)的创建和注册 在NioEventLoop#run中提到,当有IO事件时,会调用 processSelectedKeys方法来处理。 当客户端连接服务端…

    Linux 2023年6月7日
    089
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球