ThinkPHP 使用 think-queue 实现 redis 消息队列

简单介绍:
消息队列中间件是大型系统中的重要组件,已经逐渐成为企业系统内部通信的核心手段。它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。

大白话:
消息队列有两个角色和一个容器,角色分别为生产者(负责发布任务)和消费者(负责执行任务),容器这是用来存放/堆积生产者发布的任务,将发布和执行两个步骤分开且互不影响。

消息队列的大致流程为:
生产者发布任务存放/堆积在消息队列中,由消费者主动去消息队列中取出任务并执行,先发布的先执行(队列:先进先出),在没有消费者的情况下任务会堆积在队列中等待被取出执行。

优点:
消息队列适用于大并发或者处理时间长并需要批量操作的第三方接口,可用于但不仅限于短信发送、邮件发送、APP推送等,支持跨系统,即本系统发布的消息队列可以由自己或者给其他系统执行任务,同理本系统也可以作为消费者执行自己或者其他系统发布的消息队列任务。

接下来主要介绍一下 think-queue 的使用
ThinkPHP的Queue内置了 Redis、Database、Topthink、Sync四种驱动,这里使用的是 Redis,也推荐使用 Redis

think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作

消息队列基本配置
在 extra 目录,有些版本composer安装(在config)下创建 queue.php 配置文件

php
return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => '127.0.0.1',
            'port'       => 6379,
            'password'   => '',
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

生产者

创建一个测试类,写入生产者方法

php

namespace app\api\controller;

use think\Controller;
use think\Queue;

class Test extends Controller
{
    // 生产者,添加消息队列
    public function addQueue()
    {
        // 参数
        $data = [
            'id' => rand(0, 99),
            'userName' => '一起摸鱼'
        ];

        // 消息队列名
        $queueName = 'testQueue';

        // 推入消息队列,注意这里的 ::class 是PHP5.5才有的写法
        $isPushed = Queue::push(TestQueue::class, $data, $queueName);
        // PHP5.5以下的可以直接写命名空间
        // $isPushed = Queue::push('app\common\queue\TestQueue', $data, $queueName);

        if ($isPushed !== false) {
            // 成功之后的业务
            echo '队列加入成功';
        } else {
            // 失败之后的业务
            echo '队列加入失败';
        }
    }
}

消费者

创建一个 TestQueue 类,用做消费者,执行消息队列中的任务

php

namespace app\common\queue;

use think\Log;
use think\queue\Job;

class TestQueue
{
    // 消费者执行入口
    public function fire(Job $job, $data)
    {
        // 具体执行业务
        $isJobDone = $this->doJob($data);

        if ($isJobDone) {
            // 消息队列执行成功,删除队列,否则会一直执行
            $job->delete();
        } else {
            // 消息队列执行失败
            // 获取消息队列已经重试了几遍
            $attempts = $job->attempts();
            if ($attempts == 0 || $attempts == 1) {
                // 重新发布,参数 delay 是延时发布的时间
                $job->release(2);
            }
        }
    }

    // 消息队列执行失败后会自动执行该方法
    public function failed($data)
    {
        Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data));
    }

    // 消息队列执行方法
    public function doJob($data)
    {
        // 具体执行业务

        $data = json_encode($data);
        echo '消息队列:' . $data;
        // 这里的判断条件以具体业务是否执行成功进行判断
        if ($data) {
            echo "执行成功";
            return true;
        } else {
            echo "执行失败";
            return false;
        }
    }
}

运行结果

请求接口,生产者发布任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

redis 队列存放任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

接下来就是启用队列的监听模式了,因为不可能每次一有任务加进来就去手动执行一次队列。队列的监听模式有两种,配置参数如下:

ThinkPHP 使用 think-queue 实现 redis 消息队列

项目根目录执行:

php think queue:work --queue 队列名

开启消费者,执行任务

ThinkPHP 使用 think-queue 实现 redis 消息队列

redis 队列中的任务执行后也被删除 (redis桌面软件免费:Another Redis Desktop Manager)

ThinkPHP 使用 think-queue 实现 redis 消息队列

至于让队列后台运行,或添加守护进程,大家可以搜索相关知识

至此,整个消息队列流程就结束了。

转: https://blog.csdn.net/qq_44916915/article/details/124617854

Original: https://www.cnblogs.com/fps2tao/p/16399503.html
Author: 与f
Title: ThinkPHP 使用 think-queue 实现 redis 消息队列

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/528820/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 《分布式系统原理介绍》读书笔记

    1、在大型集群中每日宕机发生的概率为千分之一左右;在实践中,一台宕机的机器恢复时间通常认为是 24 小时。 2、由于网络数据丢失的异常存在,直接决定了分布式系统的协议必须能处理网络…

    Linux 2023年6月16日
    099
  • docker安装redis

    首先考虑需要安装的redis版本,我这里是安装的redis 6.0.16,如果宿主机没有,那么就docker pull redis:6.0.16 一、指定redis配置文件 我的宿…

    Linux 2023年5月28日
    091
  • Windows 常用 Dos PowerShell 命令

    Dos #^.^ dir /a/s/p PowerShell #批量重命名文件 dir -Recurse *.png | foreach {Rename-Item $_ -NewN…

    Linux 2023年5月28日
    076
  • VR一体机如何退出FFBM

    Fast Factory Boot Mode(FFBM)是一种半开机的模式,它的主要目的是方便工厂测试,提高生产效率。正常情况下终端用户是不会碰到的。但售后的同学最近连续收到几台客…

    Linux 2023年6月7日
    0225
  • 模糊测试基本概念FuzzTest

    1. what is FUZZ TESTing? Fuzz Testing is an automated software testing technology, origina…

    Linux 2023年6月7日
    065
  • 线程

    一、线程概念的引入 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区…

    Linux 2023年6月14日
    085
  • PyTorch介绍-优化模型参数

    既然已经有模型和数据了,是时候在数据上优化模型参数来训练、验证和测试它了。模型训练是一个迭代过程;在每一次迭代( epoch),模型会作出一个预测,计算其预测误差( loss),收…

    Linux 2023年6月14日
    095
  • 这 BUG,绝了

    上周只上了三天班,但我也丝毫不敢懈怠,BUG 更是一个也没少写。 看着满屏幕的 ERROR,我陷入沉思。为什么我写的代如此烂,无法像大牛们写的那般优雅? 越想越自卑,越想越抑郁。我…

    Linux 2023年6月7日
    089
  • ERROR: Exception when publishing, exception message [Failed to connect and initialize SSH connection

    jenkins 在构建时连接其他部署节点的服务器时报错,ERROR: Exception when publishing, exception message [Failed to…

    Linux 2023年6月14日
    077
  • 学习c语言时对一些疑问的测试

    include int main(){/ 1 /// int a[3][3]={6,1,2,3,4,5};// int p=&a[0][0];// printf(&#822…

    Linux 2023年6月6日
    078
  • 内部类

    内部类:将一个类的定义放在另一个类的定义内部。内部类机制可以把逻辑相关的类组织在一起,并控制位于内部的类的可视性。 内部类与组合是完全不同的概念。 内部类不仅是一种代码隐藏机制(将…

    Linux 2023年6月8日
    089
  • docker 容器大小查看及清理docker磁盘空间

    这篇文章最初是由博主创作的。请注明转载的来源: [En] This article is originally created by the blogger. Please ind…

    Linux 2023年5月27日
    092
  • 关于多个 Cookie 的分隔符这件事

    对于 Cookie 的处理上,我最近遇到一个问题,那就是如何分割 Cookie 的内容。有人说是使用逗号分割,有人说是使用分号分割,究竟用哪个才是对的?其实这个答案是需要分为两个过…

    Linux 2023年6月6日
    086
  • Redis实现延迟队列方法介绍

    延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢? 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单 如何定…

    Linux 2023年5月28日
    064
  • 如何在shell脚本中传变量的值传给curl

    随着即时通讯的发展,大量的报警媒介已经从以往的邮件转为钉钉,企业微信等聊天工具。当我使用shell脚本来监控 Keepalived的时候,在给curl传递变量的时候无法生效,经过查…

    Linux 2023年6月8日
    083
  • redis 使用lua 生成流水号

    在实际的业务场景中,我们会用到流水号。之前的流水号做法是,使用redis的全局锁。然后对数据库进行更新,数据库更新 这个也会有一些问题,比如对于同一个流水号,多个线程去更新,由于事…

    Linux 2023年5月28日
    082
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球