系统异步任务

ThinkAdmin 的核心特性之一是其异步任务处理。它支持在 WindowsLinux 平台上并行执行多个任务,显著提高了任务处理效率。

一旦监听进程 LISTEN 启动,它将以每 0.5 秒的频率扫描任务数据表。一旦发现有需要执行的任务,它将独立创建一个 PHP-CLI 进程来处理。

重要提示:为了避免重复执行任务,请确保只启动一个监听进程 LISTEN。推荐使用 START 指令来启动此进程,可使用定时任务执行 START 指令,异常时会自动重启 LISTEN 进程,有利于维护 LISTEN 进程运行状态。

Linux 环境中,任务主进程会定期执行 ps ax 命令来检查指定进程是否在运行。某些服务器安全工具可能会因此发出警告,但您可以放心忽略这些警告。

新版本的任务执行依赖于 symfony/process 组件。为了正常运行,您的运行环境必须启用 execproc_openposix_kill 等函数。

守护进程管理

  • 设置定时任务以定期检查监听进程 LISTEN 的状态。如果发现 LISTEN 进程未运行或异常时,则自动执行指令 START 来重新启动它。
  • Linux 服务器上运行时,建议使用指定用户执行,以避免 CLICGIFPM 这三种运行方式的缓存文件权限冲突。在排查问题时,请检查相关日志。
  • 为了简化操作,建议使用以下命令以指定用户身份运行:sudo -u www php think xadmin:queue start。您可以直接在系统后台的任务管理中复制此执行指令,并将其配置到定时任务中。
  • 任务进程的创建是通过 PHPexec 函数实现的,因此请确保 exec 函数未被禁用。在执行过程中遇到异常时,请根据错误提示调整 PHP 的运行环境设置。

进程管理相关指令

  • 执行 php think xadmin:queue stop # 停止所有正在执行的异步任务进程
  • 执行 php think xadmin:queue query # 查询当前所有正在执行的任务进程
  • 执行 php think xadmin:queue start # 开启异步任务守护进程(后台进程)
  • 执行 php think xadmin:queue listen # 启动异步任务监听进程(前台进程)
  • 执行 php think xadmin:queue status # 查看当前守护进程的后台运行状态

创建异步任务

可以使用 sysqueue() 函数或在控制器中使用 $this->_queue() 创建任务队列。

// 直接使用函数创建任务,获取到任务编号
$code = sysqueue($title, $command, $later = 0, $data = [], $rscript = 1, $loops = 0);

// 在控制器中创建任务,成功或失败直接返回 success|error 结果,可以使用 try 捕获
$this->_queue($title, $command, $later = 0, $data = [], $rscript = 0, $loops = 0); 

// 如果要在任务中创建新任务,需要注意创建新的任务对象哦,不然会导致原任务异常
$queue = QueueService::instance([],true); // 创建新任务服务,不替换原任务对象
$queue->register(...$args);// 注册新任务内容

// 在任务中可以使用 $this->queue 获取当前任务的数据(在任务继承了 Queue 或 QueueService 的情况下))
$this->queue->code; // 获取当前任务编号
$this->queue->data; // 获取当前任务参数
$this->queue->record; // 获取当前任务记录

// 设置当前任务进度数据及成功状态或失败状态(在任务继承了 Queue 或 QueueService 的情况下)
$this->setQueueError(); // 设置失败的消息并结束执行
$this->setQueueSuccess(); // 设置成功的消息并结束执行
$this->setQueueProgress(); // 设置运行进度并继续执行

以上两种方式都可以指定 任务标题执行任务内容执行延时时间任务绑定数据允许重复创建是否循环执行 等。

有些任务,在待处理和处理中是不需要再创建重复任务的,$rscript 就需要设置为 0,这是根据标题来识别的,所以标题也可以适当加上个性名称。

自建的异步处理是多进程任务处理,其中 windows 是基于 wmic 指令创建后台进程实现的,而 linux 则是通过 & 符实现。

因为是异步并列执行,建议自行控制下任务数据,免得过多消耗系统资源而影响项目正常使用。

在部署时,通常只需要创建定时器去执行 php think xadmin:queue start 就可以守护异步任务监听进程。

目前 ThinkAmdin V6 自定义异步任务机制支持两种规则机制:

  • 自定义单独的处理类,需要继承 think\admin\Queue 抽象类,实现 execute 方法,参数为任务绑定的参数 $data
  • 自定义 ThinkPHP 指令,默认使用 Console::call() 去尝试执行传入的指令,指令可以继承 think\admin\Command

异步任务进度

ThinkAdmin 支持任务完成状态跟进。

  • 后端创建任务之后可以获得任务编号,任务编号以字母 Q 开头
  • 前端可以使用 $.loadQueue(CODE)展示任务完成进度及数据
  • 后端任务可以使用 $this->setQueueProgress() 任务完成进度
  • 后端任务可以使用 $this->setQueueError() 设置任务已经失败
  • 后端任务可以使用 $this->setQueueSuccess() 设置任务已经完成
  • 具体参数可以查看方法源代码,进度数值为百分比,如 50 表示完成了 50%
  • 基于控制器实现,前端 data-queue=URL ,后端调用 $this->_queue() 方法创建任务

任务案例解析

前端代码(属性 data-queue 触发 $.loadQueue(Code) 展示任务进度)

<button data-queue="{:url('sync')}" class='layui-btn layui-btn-sm layui-btn-primary'>刷新会员数据</button>

后端代码(调用$this->_queue()会返回响应对象,响应对象data默认为任务编号)

/**
 * 刷新会员数据
 * @auth true
 */
public function sync()
{
    $this->_queue('重新计算所有会员级别', "xsync:member", 1, [], 0);
}

对应指令(指令需要注册,通过 sys.php 注册或 Service.php配置,执行 php think 可查看是否已生效)

// sys.php 动态注册操作指令
\think\Console::starting(function (Console $console) {
    $console->addCommand(\app\store\command\MemberRun::class);
});

在任务处理时可以使用 $this->queue 获取到数据参数(实际对象为 QueueService ,具体可以查阅对象代码)

/**
 * 重新计算会员数据
 * Class MemberRun
 * @package app\store\command
 */
class MemberRun extends \think\admin\Command
{

    protected function configure()
    {
        $this->setName('xsync:member')->setDescription('[ 商城 - 不需执行 ] 重新计算所有会员的等级');
    }

    /**
     * @param Input $input
     * @param Output $output
     * @throws \think\admin\Exception
     */
    protected function execute(Input $input, Output $output)
    {
        list($count, $total) = [0, $this->app->db->name('StoreMember')->count()];
        $this->app->db->name('StoreMember')->chunk(100, function (Collection $data) use (&$count, $total) {
            foreach ($data->toArray() as $vo) {
                $count++;
                $state = MemberService::instance()->syncLevel($vo['id']) ? '成功' : '失败';
                $this->setQueueProgress("调整会员 {$vo['id']} {$vo['phone']} {$vo['nickname']} 级别{$state}", $count / $total * 100);
            }
        });
        $this->setQueueSuccess("重新计算 {$count} 个会员的级别!");
    }
} 
Last Updated:
Contributors: 邹景立