laravel源码分析-队列Queue

队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。本文我们就来分析下队列创建和执行的源码。

先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php。

> php artisan make:job DemoJob

> Job created successfully.

下面我们来分析一下 Job 类的具体生成过程。

执行 php artisan make:job DemoJob 后,会触发调用如下方法。

laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

/**
 * Register the command.

 * [A] make:job 时触发的方法
 * @return void
 */
protected function registerJobMakeCommand()
{
    $this->app->singleton('command.job.make', function ($app) {
        return new JobMakeCommand($app['files']);
    });
}

接着我们来看下 JobMakeCommand 这个类,这个类里面没有过多的处理逻辑,处理方法在其父类中。

class JobMakeCommand extends GeneratorCommand

我们直接看父类中的处理方法,GeneratorCommand->handle(),以下是该方法中的主要方法。

public function handle()
{
    // 获取类名
    $name = $this->qualifyClass($this->getNameInput());
    // 获取文件路径
    $path = $this->getPath($name);
    // 创建目录和文件
    $this->makeDirectory($path);
    // buildClass() 通过模板获取新类文件的内容
    $this->files->put($path, $this->buildClass($name));
    // $this->type 在子类中定义好了,例如 JobMakeCommand 中 type = 'Job'
    $this->info($this->type.' created successfully.');
}

方法就是通过目录和文件,创建对应的类文件,至于新文件的内容,都是基于已经设置好的模板来创建的,具体的内容在 buildClass($name) 方法中。

protected function buildClass($name)
{
    // 得到类文件模板,getStub() 在子类中有实现,具体看 JobMakeCommand
    $stub = $this->files->get($this->getStub());
    // 用实际的name来替换模板中的内容,都是关键词替换
    return $this->replaceNamespace($stub, $name)->replaceClass($stub, $name);
}

获取模板文件

protected function getStub()
{
    return $this->option('sync')
                    ? __DIR__.'/stubs/job.stub'
                    : __DIR__.'/stubs/job-queued.stub';
}

job.stub

job-queued.stub

下面看一下前面我们创建的一个Job类,DemoJob.php,就是来源于模板 job-queued.stub。

至此,我们已经大致明白了队列任务类是如何创建的了。下面我们来分析下其是如何生效运行的。

任务类创建后,我们就可以在需要的地方进行任务的分发,常见的方法如下:

DemoJob::dispatch(); // 任务分发
DemoJob::dispatchNow(); // 同步调度,队列任务不会排队,并立即在当前进程中进行

下面先以 dispatch() 为例分析下分发过程。

trait Dispatchable
{
    public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }
}
class PendingDispatch
{
    protected $job;

    public function __construct($job)
    {   echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL;
        $this->job = $job;
    }

    public function __destruct()
    {   echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL;
        app(Dispatcher::class)->dispatch($this->job);
    }
}

重点是 app(Dispatcher::class)->dispatch($this->job) 这部分。

我们先来分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自带的 BusServiceProvider 中向 $app 中注入的。

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });
    }
}

看一下 Dispatcher 的构造方法,至此,我们已经知道前半部分 app(Dispatcher::class) 是如何来的了。

class Dispatcher implements QueueingDispatcher
{
    protected $container;
    protected $pipeline;
    protected $queueResolver;

    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        /**
         * Illuminate/Bus/BusServiceProvider.php->register()中
         * $queueResolver 传入的是一个闭包
         * function ($connection = null) use ($app) {
         *   return $app[QueueFactoryContract::class]->connection($connection);
         * }
         */
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }

    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
                // 将 $command 存入队列
            return $this->dispatchToQueue($command);
        }
        return $this->dispatchNow($command);
    }
}

BusServiceProvider 中注册了 Dispatcher::class ,然后 app(Dispatcher::class)->dispatch($this->job) 调用的即是 Dispatcher->dispatch()。

public function dispatchToQueue($command)
{
    // 获取任务所属的 connection
    $connection = $command->connection ?? null;
    /*
     * 获取队列实例,根据config/queue.php中的配置
     * 此处我们配置 QUEUE_CONNECTION=redis 为例,则获取的是RedisQueue
     * 至于如何通过 QUEUE_CONNECTION 的配置获取 queue ,此处先跳过,本文后面会具体分析。
     */
    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }
    // 我们创建的DemoJob无queue方法,则不会调用
    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }
    // 将 job 放入队列
    return $this->pushCommandToQueue($queue, $command);
}

protected function pushCommandToQueue($queue, $command)
{
    // 在指定了 queue 或者 delay 时会调用不同的方法,基本大同小异
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    // 此处我们先看最简单的无参数时的情况,调用push()
    return $queue->push($command);
}

笔者的配置是 QUEUE_CONNECTION=redis ,估以此来分析,其他类型的原理基本类似。

配置的是 redis 时, $queue 是 RedisQueue 实例,下面我们看下 RedisQueue->push() 的内容。

Illuminate/Queue/RedisQueue.php

public function push($job, $data = '', $queue = null)
{
    /**
     * 获取队列名称
     * var_dump($this->getQueue($queue));
     * 创建统一的 payload,转成 json
     * var_dump($this->createPayload($job, $this->getQueue($queue), $data));
     */
    // 将任务和数据存入队列
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}

public function pushRaw($payload, $queue = null, array $options = [])
{
    // 写入redis中
    $this->getConnection()->eval(
        LuaScripts::push(), 2, $this->getQueue($queue),
        $this->getQueue($queue).':notify', $payload
    );
    // 返回id
    return json_decode($payload, true)['id'] ?? null;
}

至此,我们已经分析完了任务是如何被加入到队列中的。

Original: https://www.cnblogs.com/immaxfang/p/15775547.html
Author: immaxfang
Title: laravel源码分析-队列Queue

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582240/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • linux常用命令

    linux常用目录 /bin :bin是Binary的缩写,这个目录存放着最经常使用的命令。 /ect :这个目录用来存放所有的系统所需要的配置文件和子目录。 /home:用户的主…

    Linux 2023年6月13日
    0108
  • 利用卷积神经网络处理cifar图像分类

    这是一个图像分类的比赛CIFAR( CIFAR-10 – Object Recognition in Images ) 首先我们需要下载数据文件,地址: http://…

    Linux 2023年6月6日
    0121
  • 剑指offer计划23( 数学简单)—java

    1.1、题目1 剑指 Offer 39. 数组中出现次数超过一半的数字 1.2、解法 万能哈希表,数学方法看到我要吐血。。。 1.3、代码 class Solution { pub…

    Linux 2023年6月11日
    0107
  • Postman环境变量的使用

    前言 请注意,Postman新版有ui上的改动,本文使用的Postman 版本8.4.0 for Mac, ui有调整,但是功能无改变。 Postman是一款接口调测的软件,服务端…

    Linux 2023年6月14日
    0106
  • LVS 负载均衡集群

    1.1 LVS介绍 LVS 是 Linux Virtual Server 的简写,即 Linux 虚拟服务器,是一个虚拟的服务器集群系统。此项目在 1998 年 5 月由章文嵩博士…

    Linux 2023年6月6日
    097
  • Springboot集成Redis举例

    依赖包 配置文件(application.properties) 配置文件(RedisConfig.java) import com.fasterxml.jackson.annot…

    Linux 2023年5月28日
    0106
  • C语言实现扫雷游戏(完整版)

    头文件定义、函数声明 下面就是扫雷中使用到的所有函数,为了省事我把所有的代码都放在一个C文件中实现 宏定义中设置了游戏的界面布局,以及设置地雷的个数(这里默认的是10个地雷),界面…

    Linux 2023年6月6日
    0140
  • 记录一次shell脚本环境全局变量在函数内部生效问题

    背景 计划核对内网IP的使用情况,所以写了个小脚本扫描有哪些IP还在使用。执行脚本过程中发现函数中一直获取不到变量的值,排查后将结论记录下来。 问题现象 已经配置了全局变量,但是在…

    Linux 2023年6月14日
    0110
  • ASP.NET Core 2.2 : 二十七. JWT与用户授权(细化到Action)

    上一章分享了如何在ASP.NET Core中应用JWT进行用户认证以及Token的刷新,本章继续进行下一步,用户授权。涉及到的例子也以上一章的为基础。(ASP.NET Core 系…

    Linux 2023年6月7日
    0108
  • 职场最讨厌的人,没有之一

    人物背景: 姓名:春绿,性别:未知,年龄:不详,工龄:菜鸟,人物特点:爱管闲事,管不住自己的嘴,情商约等于0.000001 人物故事: 1、领导给小明安排了一个工作,被春绿听到了,…

    Linux 2023年6月13日
    0106
  • 微信小程序大型系统架构中应用Redis缓存要点

    在大型分布式系统架构中,必须选择适合的缓存技术以应对高并发,实现系统相应的高性能,酷客多小程序经过慎重选型,选择了采用基于腾讯云服务的Redis弹性缓存技术,结合Redis官方推荐…

    Linux 2023年5月28日
    0105
  • 对象缓存服务的思考和实现

    写在前面 目前在很多业务中,存储都大量的依赖了云存储,比如阿里云的 oss、华为云的 obs 等。但是如果有大量的上传/下载任务,云存储上的网络 I/0 就变成了一个很大的瓶颈。 …

    Linux 2023年6月14日
    099
  • 尤娜故事-迷雾-springboot扮酷小技巧

    前情回顾 从前,有一个简单的通道系统叫尤娜…… 尤娜系统的第一次飞行中换引擎的架构垂直拆分改造 四种常用的微服务架构拆分方式 尤娜,我去面试了 正文 我回到…

    Linux 2023年6月14日
    089
  • 搭建redis集群

    这里总结性给出搭建步骤: 1、 至少6个节点,三主三从 2、 编译redis源码 3、放置集群的配置文件redis.conf 创建工作目录: 每个文件夹下新建redis.conf …

    Linux 2023年5月28日
    098
  • 每周一个linux命令(ping)

    基础环境 ping命令介绍 ping命令主要用来…

    Linux 2023年6月8日
    0100
  • ELK时间戳

    ELK时间戳 在我们使用ELK过程中,总会遇到时间戳的问题。首先 logstash如果没有加以处理的话,那么它默认使用的是采集的时间戳,然后存入 ES。那么这样的话时间显示的是错误…

    Linux 2023年6月8日
    0109
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球