THINKPHP6 队列

标签: queue  thinphp

1.安装think-queue

composer require topthink/think-queue

2.配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动

如选择database,需创建表

CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserve_time` int(10) unsigned DEFAULT NULL, `available_time` int(10) unsigned NOT NULL, `create_time` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

3.创建生产者

class Index extends BaseController
{
    
    /**
     * 单任务
     */
    public function singleTask()
    {
        //当前任务将由哪个类来负责处理
        $jobHandlerClassName = 'app\job\Job1';
        //业务数据 对象需要手动转序列化
        $jobData = ['ts' => time()];
        //队列名称
        $jobQueueName = "createOrderJob";
        //入队列,later延时发送,单位秒。push立即发送
        $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
        //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo '执行成功';
        }else{
            echo '执行失败';
        }
        //php think queue:listen --queue createOrderJob  执行队列
        //nohup php think queue:listen --queue createOrderJob &  不以守护进程执行
    }

    /**
     * 多任务
     */
    public function multiTask(){
        $taskType = $_GET['taskType'];
        switch ($taskType) {
            case 'taskA':
                $jobHandlerClassName  = 'app\job\[email protected]';
                $jobDataArr = ['a'   => '1'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            case 'taskB':
                $jobHandlerClassName  = 'app\job\[email protected]';
                $jobDataArr = ['b'   => '2'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            default:
                break;
        }

        $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
        if ($isPushed !== false) {
            echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
        }else{
            echo "push a new $taskType of MultiTask Job Failed!";
        }
    }

}

4.创建消费者

单任务

class Job1
{
    public function fire(Job $job, $data)
    {
        //业务处理代码,具体不贴出来了
        $isJobDone = $this->jobDone($data);
        //执行成功删除
        if($isJobDone){
            $job->delete();
            print("任务已经被执行成功并且删除");
        }else{
            $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
            print("任务3s后再次被执行");
        }
        //通过这个方法可以检查任务重试了几次
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }

    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
    }

    private function jobDone($data){
        Log::write('这是数据 ' . json_encode($data));
        return true;
    }

多任务

class MultiTask{
    public function taskA(Job $job,$data){

        $isJobDone = $this->_doTaskA($data);

        if ($isJobDone) {
            $job->delete();
            print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }

    public function taskB(Job $job,$data){

        $isJobDone = $this->_doTaskB($data);

        if ($isJobDone) {
            $job->delete();
            print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }

    private function _doTaskA($data) {
        print("Info: doing TaskA of Job MultiTask "."\n");
        return true;
    }

    private function _doTaskB($data) {
        print("Info: doing TaskB of Job MultiTask "."\n");
        return true;
    }
}

5.执行

php think queue:listen --queue createOrderJob

我的文件目录

在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。

不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么如果不需要自动重发的话, 请在抛出异常之前将任务删除 $job->delete() ,以免产生bug。 如果需要自动重发的话,请直接抛出异常,不要在 fire() 方法中又手动使用 $job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。

版权声明:本文为MyDream229原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/MyDream229/article/details/108863094

智能推荐

经典进程同步和互斥问题

经典进程同步与互斥问题 前言 一、生产者-消费者问题 1.问题描述 2.问题分析 3.代码 二、读者-写者问题 1.问题描述&&分析 2.代码 三、哲学家进餐问题 1.问题描述&&分析 2.代码 四、理发师问题 1.问题描述&&分析 2.代码 前言 在多道程序设计环境中,进程同步是一个非常重要的问题,下面讨论几个经典的进程同步问题。 一、生产者-消费...

java设计模式——ThreadLocal线程单例

1、定义一个ThreadLocal线程单例,代码如下: 2、定义一个多线程类,代码如下: 3、定义一个测试类,代码如下: 4、输出结果,如下图:...

【tensorflow】线性模型实战

线性模型:y = 1.477 * x + 0.089   1. 采样数据 采样噪声eps在均值0,方差0.01的高斯分布中,而后在均匀分布U(0,1)中,区间[-10,10]进行n=100次随机采样:   2. 计算误差 循环计算每个点的预测值与真是值之间差的平方并累加,从而获得训练集上的均芳误差损失值。   3. 计算梯度   4. 梯度更新 对权重w和偏...

常见损失函数和评价指标总结(附公式&代码)

网上看到一篇很实用的帖子关于常见损失函数和评价指标,收藏下来 本文转载于https://zhuanlan.zhihu.com/p/91511706 ------------------------------------------------------------------------------------------------------------------------------...

为什么 4G/5G 的直播延时依然很高

通信技术的发展促进了视频点播和直播业务的兴起,4G 和 5G 网络技术的进步也使得流媒体技术变得越来越重要,但是网络技术并不能解决流媒体直播的高延迟问题。 本文不会介绍网络对直播业务的影响,而是会分析直播中常见的现象 — 主播和观众之间能够感觉到的明显网络延迟。除了业务上要求的延迟直播之外,有哪些因素会导致视频直播的延迟这么高呢? live-streaming  图 1 - ...

猜你喜欢

springboot 过滤器Filter vs 拦截器Interceptor 详解

1 前言       最近接触到了过滤器和拦截器,网上查了查资料,这里记录一下,这篇文章就来仔细剖析下过滤器和拦截器的区别与联系。 2 拦截器与过滤器之间的区别 从上面对拦截器与过滤器的描述来看,它俩是非常相似的,都能对客户端发来的请求进行处理,它们的区别如下: 作用域不同 过滤器依赖于servlet容器,只能在 servlet容器,web环境下使用 拦截器依赖于sp...

IDEA环境--JavaWeb项目【分页功能实现】

参考链接:https://www.jianshu.com/p/d108d0cd9acf 1、前言 最近在写一些项目,遇到要使用分页功能的地方,就简单的学习了一下,在此总结一下具体实现的过程以及遇到的问题。 分页功能:当我们写一下web项目时会遇到一个页面要显示很多数据,一下子都显示出来效率会很低,也不美观。这就要用到分页,其作用也就是将数据分割成多个页面来进行显示。 2、项目介绍 这只是一个简单的...

517【毕设课设】基于单片机仓库家庭防火防盗报警系统

【资源下载】下载地址如下: https://docs.qq.com/doc/DTlRSd01BZXNpRUxl 功能简要说明: 1.51单片机+1602液晶+按键+烟雾检测传感器MQ+红外检测+蜂鸣器+DHT11温湿度传感器; 2.按键设置烟雾报警浓度值,温度报警值; 3.当达到报警条件,蜂鸣器响; 5.电路板为PCB腐蚀所做,图文件为altiumdesigner工程文件。 6.程序采用C语言编写...

Windows端口被占用 优雅的解除占用

1. 点击搜索输入cmd,右键以管理员身份打开命令行 2. 查看占用端口的进程id 3. 杀死进程,解除端口占用...

JVM 内存管理与垃圾收集算法

本文整理自网络和书籍。 自动内存管理   Java内存分配与管理是Java的核心技术之一,一般来说,Java在内存分配会涉及到以下区域: 区域 说明 寄存器 我们在程序中无法控制。 栈 存放基本类型的数据和对象的引用,但对象本身不存放在栈中,而是存放在堆中。 堆 存放new产生的数据。 静态域 存放在对象中用static定义的静态成员。 常量池 存放常量。 非RAM存储 硬盘等永久存储空间。 Ja...