基于Swoole和Redis实现的并发队列处理系统

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

具体的工作流程如下:
1、主系统将需要需要处理的任务名称+任务参数push到队列中。
2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

具体代码如下:

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}

/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}

/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}

这是一个简单的任务处理系统。

通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。

但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。

这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!

第一个问题还好,但第二个问题就很严重。

当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在PHP7之前不支持多线程,所以我们采用多进程。

从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。

显然这是不合适的。

我的预想是: 每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行Google,这里直接看代码。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

declare是一个控制结构语句,具体的用法也请去Google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于Redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用Swoole扩展中的Process。

具体代码如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{

    use Trait_Redis;

    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    private $redis_task_wing = 'task:wing'; //待处理队列

    public function init(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection
    }

    private function redis_client(){
        $rds = new Redis();
        $rds->connect('redis.master.host',6379);
        return $rds;
    }

    public function process(swoole_process $worker){// 第一个处理
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();            //send data to master

            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }

    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }

    public function runAction(){
        while (1){
//            echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//&#x65E0;&#x4EFB;&#x52A1;&#x65F6;,&#x963B;&#x585E;&#x7B49;&#x5F85;
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }

    private function sig_handler($signo) {
//        echo "Recive: $signo \r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}

ps:欢迎各位大神与我交流,不知能否做到更好~

* 我是闫大伯,一只奋战了两个周末的野生程序猿 *

原文地址:

Original: https://www.cnblogs.com/qixidi/p/10414732.html
Author: 栖息地
Title: 基于Swoole和Redis实现的并发队列处理系统

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/529366/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • vm-tools安装

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Linux 2023年6月6日
    079
  • RabbitMQ超详细安装教程(Linux)

    镜像下载、域名解析、时间同步请点击阿里云开源镜像站 1、简介 官网:https://www.rabbitmq.com/ RabbitMQ是一个开源的遵循AMQP协议实现的基于Erl…

    Linux 2023年5月27日
    0177
  • 如何在shell脚本中传变量的值传给curl

    随着即时通讯的发展,大量的报警媒介已经从以往的邮件转为钉钉,企业微信等聊天工具。当我使用shell脚本来监控 Keepalived的时候,在给curl传递变量的时候无法生效,经过查…

    Linux 2023年6月8日
    0100
  • python 练习题:计算的BMI指数,并根据BMI指数条件选择

    小明身高1.75,体重80.5kg。请根据BMI公式(体重除以身高的平方)帮小明计算他的BMI指数,并根据BMI指数:低于18.5:过轻18.5-25:正常25-28:过重28-3…

    Linux 2023年6月8日
    0137
  • linux编译安装nginx

    本文升级过程,适用于大部分nginx编译版本 常用编译选项说明nginx大部分常用模块,编译时./configure –help以–without开头的都默认安装。 –prefix…

    Linux 2023年6月14日
    094
  • JQ 实现对比两个文本的差异并高亮显示差异部分

    利用jq对比两段文本的差异,差异的内容用不同颜色表示出来。 在线参考demo:http://incaseofstairs.com/jsdiff/ 项目地址:https://gith…

    Linux 2023年6月7日
    0114
  • 嵌入式软件架构设计-消息交互

    1、前言 在熟悉任务调度、程序分层和模块化编程关于软件架构、分层和模块设计后,除了函数调用设计中出现的情况外,还会遇到同层模块之前如何进行消息交互,通常是应用层之间。 比如一个设备…

    Linux 2023年6月7日
    0112
  • 让Mac界面和Windows界面在Laxcus集群操作系统上合体

    如果一套操作系统,同时拥有苹果Mac风格的图形桌面,和微软Windows风格的图形桌面,你会是一种什么样的体验? 最近,我们公司的GUI研发团队完成了这项技术突破,在持续不断地技术…

    Linux 2023年6月6日
    0113
  • 异步、邮件、定时任务

    异步、邮件、定时任务 14.1 异步任务 编写一个业务测试类 文件路径:com–dzj–service–AsynService.java @Se…

    Linux 2023年6月14日
    0106
  • PYTORCH: 60分钟 | 神经网络

    神经网络可以使用 torch.nn包构建。 现在你已经对autograd有所了解, nn依赖 autograd 定义模型并对其求微分。 nn.Module 包括层,和一个返回 ou…

    Linux 2023年6月16日
    0168
  • 剑指offer计划21( 位运算简单)—java

    1.1、题目1 剑指 Offer 15. 二进制中1的个数 1.2、解法 通过判断每一位的与来识别1的数量。 1.3、代码 public class Solution { // y…

    Linux 2023年6月11日
    0141
  • Servlet版本冲突导致页面404

    先准备好了Tomcat环境以及用Idea打了一个Servlet war包想看看效果,结果发现页面跳转一直报404错误,检查了跳转url,项目结构等情况后,问题依旧没有解决。最后偶然…

    Linux 2023年6月7日
    092
  • oracle 触发器trigger(主键自增长)

    触发器我们也可以认为是存储过程,是一种特殊的存储过程。 存储过程:有输入参数和输出参数,定义之后需要调用 触发器:没有输入参数和输出参数,定义之后无需调用,在 适当的时候会自动执行…

    Linux 2023年6月7日
    096
  • 【Leetcode】120. 三角形最小路径和

    给定一个三角形 triangle ,找出自顶向下的最小路径和。 每一步只能移动到下一行中相邻的结点上。 &#x76F8;&#x90BB;&#x7684;&a…

    Linux 2023年6月6日
    0109
  • Markdown 常用语法精讲

    标题 (# 跟标题名称一定要留空格) 一级标题 二级标题 三级标题 四级标题 五级标题 六级标题 缩进 (使用) 这是缩进四个空格文本 (源码: 这是缩进四个空格文本) 强调/加粗…

    Linux 2023年6月7日
    0132
  • 关于面试的那些事

    这周做了三次的笔试,周日的XXX,周三的XXX,周五的XXX。 首先周日的XXX,考了四道算法题,两个小时,说实话题目都没怎么读懂,算法基本没接触过,结果一道也没有做出来,只想说,…

    Linux 2023年6月7日
    0122
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球