• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

php+redis实现延迟队列

php 搞代码 4年前 (2022-01-22) 11次浏览 已收录 0个评论

这篇文章主要介绍了关于php+redis实现延迟队列,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下

基于redis有序集实现延迟任务执行,比如某个时间给某个用户发短信,订单过期处理,等等
我是在tp5框架上写的,实现起来很简单,对于一些不是很复杂的应用足够了,目前在公司项目中使用,后台进程并没有实现多进程,
不多说,贴代码,不回排版,见谅

1、命令行脚本 执行方法:php think delay-queue queuename(这是有序集的key)

namespace app\command;use app\common\lib\delayqueue\DelayQueue;use think\console\Command;use think\console\Input;use think\console\Output;use think\Db;class DelayQueueWorker extends Command{    const COMMAND_ARGV_1 = 'queue';    protected function configure()    {        $this->setName('delay-queue')->setDescription('延迟队列任务进程');        $this->addArgument(self::COMMAND_ARGV_1);    }    protected function execute(Input $input, Output $output)    {        $queue = $input->getArgument(self::COMMAND_ARGV_1);        //参数1 延迟队列表名,对应与redis的有序集key名        while (true) {            DelayQueue::getInstance($queue)->perform();            usleep(300000);        }    }}

库类目录结构

config.php 里是redis连接参数配置

RedisHandler.php只实现有序集的操作,重连机制还没有实现

namespace app\common\lib\delayqueue;class RedisHandler{    public $provider;    private static $_instance = null;    private function __construct() {        $this->provider = new \Redis();        //host port        $config = require_once 'config.php';        $this->provider->connect($config['redis_host'], $config['redis_port']);    }    final private function __clone() {}    public static function getInstance() {        if(!self::$_instance) {            self::$_instance = new RedisHandler();        }        return self::$_instance;    }    /**     * @param string $key 有序集key     * @param number $score 排序值     * @param string $value 格式化的数据     * @return int     */    public function zAdd($key, $score, $value)    {        return $this->provider->zAdd($key, $score, $value);    }    /**     * 获取有序集数据     * @param $key     * @param $start     * @param $end     * @param null $withscores     * @return array     */    public function zRange($key, $start, $end, $withscores = null)    {        return $this->provider->zRange($key, $start, $end, $withscores);    }    /**     * 删除有序集数据     * @param $key     * @param $member     * @return int     */    public function zRem($key,$member)    {        return $this->provider->zRem($key,$member);    }}

延迟队列类

namespace app\common\lib\delayqueue;class DelayQueue{    private $prefix = 'delay_queue:';    private $queue;    private static $_instance = null;    private function __construct($queue) {        $this->queue = $queue;    }    final private function __clone() {}    public static function getInstance($queue = '') {        if(!self::$_instance) {            self::$_instance = new DelayQueue($queue);        }        return self::$_instance;    }    /**     * 添加任务信息到队列     *     * demo DelayQueue::getInstance('test')->addTask(     *    'app\common\lib\delayqueue\job\Test',     *    strtotime('2018-05-02 20:55:20'),     *    ['abc'=>111]     * );     *     * @param $jobClass     * @param int $runTime 执行时间     * @param array $args     */    public function addTask($jobClass, $runTime, $args = null)    {        $key = $this->prefix.$this->queue;        $params = [            'class' => $jobClass,            'args'  => $args,            'runtime' => $runTime,        ];        RedisHandler::getInstance()->zAdd(            $key,            $runTime,            serialize($params)        );    }    /**     * 执行job     * @return bool     */    public function perform()    {        $key = $this->prefix.$this->queue;        //取出有序集第一个元素        $result = RedisHandler::getInstance()->zRange($key, 0 ,0);        if (!$result) {            return false;        }        $jobInfo = unserialize($result[0]);        print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);        $jobClass = $jobInfo['class'];        if(!@class_exists($jobClass)) {            print_r($jobClass.' undefined'. PHP_EOL);            RedisHandler::getInstance()->zRem($key, $result[0]);            return false;        }        // 到时间执行        if (time() >= $jobInfo['runtime']) {            $job = new $jobClass;            $job->setPayload($jobInfo['args']);            $jobResult = $job->preform();            if ($jobResult) {                // 将任务移除                RedisHandler::getInstance()->zRem($key, $result[0]);                return true;            }        }        return false;    }}

异步任务基类:

namespace app\common\lib\delayqueue;class DelayJob{    protected $payload;    public function preform ()    {        // todo        return true;    }    public function setPayload($args = null)    {        $this->payload = $args;    }}

所有异步执行的任务都卸载job目录下,且要继承DelayJob,你可以实现任何你想延迟执行的任务

如:

namespace app\common\lib\delayqueue\job;use app\common\lib\delayqueue\DelayJob;class Test extends DelayJob{    public function preform()    {        // payload 里应该有处理任务所需的参数,通过DelayQueue的addTask传入        print_r('test job'.PHP_EOL)<em>8本文来源gao.dai.ma.com搞@代*码(网$</em><pre>搞代gaodaima码

; return true; }}

使用方法:

假设用户创建了一个订单,订单在10分钟后失效,那么在订单创建后加入:

DelayQueue::getInstance('close_order')->addTask(     'app\common\lib\delayqueue\job\CloseOrder', // 自己实现的job     strtotime('2018-05-02 20:55:20'), // 订单失效时间     ['order_id'=>123456] // 传递给job的参数 );

close_order 是有序集的key

命令行启动进程

php think delay-queue close_order

相关推荐:

PHP+redis实现session共享

以上就是php+redis实现延迟队列的详细内容,更多请关注搞代码gaodaima其它相关文章!


搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:php+redis实现延迟队列

喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址