⚡ 异步任务

ThinkAdmin 的异步任务处理是其核心特性之一,支持在 WindowsLinux 平台上并行执行多个任务,显著提高了任务处理效率。

📚 基础概念

🤔 基本介绍

异步任务是指不需要立即返回结果的操作,可以在后台慢慢处理。

简单理解:就像餐厅点餐,同步任务是"现做现吃"(用户要等待),异步任务是"下单后先给你号牌,菜好了再通知你"(用户无需等待)。

实际场景

  • 发送邮件:发送 1000 封邮件可能需要 10 分钟,如果同步执行,用户要等 10 分钟
  • 生成报表:处理大量数据生成报表可能需要 5 分钟,如果同步执行,用户要等 5 分钟
  • 图片处理:压缩 100 张图片可能需要 3 分钟,如果同步执行,用户要等 3 分钟

使用异步任务后

  • 用户提交请求后立即返回(1 秒内)
  • 后台慢慢处理任务(可能需要几分钟)
  • 处理完成后通知用户(通过进度条或消息)

🤔 使用优势

问题1:用户体验差

如果使用同步任务,用户需要等待很长时间:

// ❌ 不推荐:同步处理(用户要等待)
public function sendEmails()
{
    $users = User::select();  // 1000 个用户
    foreach ($users as $user) {
        // 发送邮件,每个需要 1 秒
        Mail::send($user->email, '通知');
    }
    // 用户需要等待 1000 秒(约 17 分钟)!
    $this->success('发送完成');
}

使用异步任务后

// ✅ 推荐:异步处理(用户立即返回)
public function sendEmails()
{
    // 创建异步任务,立即返回
    $this->_queue('批量发送邮件', 'email:send', 0, ['user_ids' => $ids]);
    // 用户立即看到"任务已创建"的提示,无需等待
    $this->success('任务已创建,正在后台处理');
}

问题2:系统性能

同步任务会长时间占用 Web 进程,影响其他用户访问:

// ❌ 不推荐:占用 Web 进程
public function processData()
{
    // 处理大量数据,需要 5 分钟
    // 这 5 分钟内,这个 Web 进程被占用,无法处理其他请求
    processLargeData();
}

使用异步任务后

// ✅ 推荐:独立进程处理
public function processData()
{
    // 创建异步任务,Web 进程立即释放
    $this->_queue('处理数据', 'data:process', 0, []);
    // Web 进程可以立即处理其他用户的请求
}

🔄 同步 vs 异步

同步任务流程

用户提交请求

服务器处理(用户等待中...)

处理完成(可能需要几分钟)

返回结果给用户

异步任务流程

用户提交请求

立即返回"任务已创建"

后台独立进程处理(用户无需等待)

处理完成后更新状态

用户可以通过进度条查看进度

⚙️ 工作原理

ThinkAdmin 使用队列机制处理异步任务,整个过程是自动的:

步骤1:创建任务

将任务信息保存到数据库:

// 在控制器中创建任务
$this->_queue('任务标题', 'command:name', 0, ['param' => 'value']);

系统自动执行

// 系统自动将任务保存到数据库
// 表名:system_queue
// 字段:title(标题)、command(命令)、data(数据)、status(状态)等

步骤2:监听进程扫描

后台监听进程每 0.5 秒扫描一次数据库:

# 监听进程自动执行(无需手动操作)
php think xadmin:queue listen

扫描逻辑(系统自动执行):

// 每 0.5 秒执行一次
while (true) {
    // 查询待处理的任务(status = 1)
    $tasks = Queue::where('status', 1)->select();
    
    foreach ($tasks as $task) {
        // 创建独立进程执行任务
        exec("php think {$task->command} > /dev/null 2>&1 &");
    }
    
    sleep(0.5);  // 等待 0.5 秒后继续扫描
}

步骤3:执行任务

系统创建独立的 PHP-CLI 进程执行任务:

# 系统自动创建进程(无需手动操作)
php think command:name --data='{"param":"value"}'

步骤4:更新状态

任务执行完成后,自动更新状态:

// 在任务中调用(系统自动处理)
$this->setQueueSuccess('任务完成');  // 更新状态为成功
// 或
$this->setQueueError('任务失败');    // 更新状态为失败

📖 相关概念

队列(Queue)

  • 待处理任务的列表
  • 存储在数据库 system_queue 表中
  • 每个任务有唯一编号(以 Q 开头)

进程(Process)

  • 执行任务的独立程序
  • 与 Web 进程分离,互不影响
  • Windows 使用 wmic 创建,Linux 使用 & 创建

监听(Listen)

  • 定期检查是否有新任务
  • 默认每 0.5 秒扫描一次
  • 发现任务后自动创建进程执行

命令(Command)

  • 执行任务的具体代码
  • 继承 think\admin\Command
  • 或继承 think\admin\Queue

🎯 使用场景

场景1:批量发送邮件

// 需要发送 1000 封邮件,每封需要 1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢发送
$this->_queue('批量发送邮件', 'email:batch', 0, ['user_ids' => $ids]);

场景2:处理大量数据

// 需要处理 10000 条数据,每条需要 0.1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('处理数据', 'data:process', 0, ['data' => $data]);

场景3:生成报表

// 需要生成复杂的 Excel 报表,需要 5 分钟
// 同步:用户等待 5 分钟
// 异步:用户立即返回,生成完成后通知下载
$this->_queue('生成报表', 'report:generate', 0, ['params' => $params]);

场景4:图片处理

// 需要压缩 100 张图片,每张需要 2 秒
// 同步:用户等待 200 秒(约 3 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('压缩图片', 'image:compress', 0, ['images' => $images]);

🚀 主要特性

  • 并行处理: 支持多个任务同时执行
  • 跨平台: 兼容 Windows 和 Linux 系统
  • 自动监控: 自动扫描和执行待处理任务
  • 进程管理: 独立的 PHP-CLI 进程处理任务
  • 异常恢复: 支持自动重启和异常处理

⚙️ 工作原理

ThinkAdmin 的异步任务机制基于队列和进程管理,整个过程是自动化的。

🔍 任务扫描机制

监听进程定期扫描数据库,查找待处理的任务:

扫描频率

  • 默认每 0.5 秒 扫描一次
  • 可以通过配置调整扫描频率
  • 扫描不会影响系统性能(查询很快)

扫描过程(系统自动执行):

// 系统自动执行的逻辑(无需手动编写)
while (true) {
    // 1. 查询待处理的任务(status = 1)
    $tasks = Db::name('system_queue')
        ->where('status', 1)
        ->where('exec_at', '<=', time())
        ->select();
    
    // 2. 为每个任务创建独立进程
    foreach ($tasks as $task) {
        // 更新任务状态为处理中
        $task->status = 2;
        $task->save();
        
        // 创建独立进程执行任务
        if (PHP_OS === 'WINNT') {
            // Windows 系统使用 wmic
            exec("wmic process call create \"php think {$task->command}\"");
        } else {
            // Linux 系统使用 &
            exec("php think {$task->command} > /dev/null 2>&1 &");
        }
    }
    
    // 3. 等待 0.5 秒后继续扫描
    sleep(0.5);
}

多任务并行

  • 系统支持同时执行多个任务
  • 每个任务都是独立的进程
  • 互不影响,可以并行处理

🎮 进程管理

ThinkAdmin 提供了完整的进程管理命令:

LISTEN - 监听进程

启动监听进程,负责扫描和执行任务:

# 启动监听进程(前台运行,可以看到日志)
php think xadmin:queue listen

特点

  • 前台运行,可以看到实时日志
  • 适合开发环境调试
  • 进程异常退出后不会自动重启

START - 启动守护进程

启动守护进程,自动管理监听进程:

# 启动守护进程(推荐方式)
php think xadmin:queue start

特点

  • 后台运行,自动管理监听进程
  • 监听进程异常退出后会自动重启
  • 适合生产环境使用

STOP - 停止进程

停止所有正在执行的任务进程:

# 停止所有任务进程
php think xadmin:queue stop

使用场景

  • 系统维护时需要停止所有任务
  • 任务执行异常需要强制停止
  • 更新代码后需要重启进程

QUERY - 查询进程

查询当前正在执行的任务进程:

# 查询当前运行的任务
php think xadmin:queue query

输出示例

正在执行的任务:
- Q20241110123456: 批量发送邮件 (进度: 50%)
- Q20241110123457: 处理数据 (进度: 30%)

STATUS - 查看状态

查看守护进程的运行状态:

# 查看守护进程状态
php think xadmin:queue status

输出示例

守护进程状态:运行中
监听进程状态:运行中
当前任务数:2

🔄 完整工作流程

流程1:创建任务

// 1. 用户在控制器中创建任务
public function sync()
{
    $this->_queue('同步数据', 'data:sync', 0, []);
}

流程2:任务入库

// 2. 系统自动将任务保存到数据库
// system_queue 表
// - title: '同步数据'
// - command: 'data:sync'
// - status: 1 (待处理)
// - exec_at: 当前时间

流程3:监听扫描

# 3. 监听进程扫描数据库(每 0.5 秒)
# 发现待处理任务,创建独立进程执行

流程4:执行任务

// 4. 独立进程执行任务
class DataSync extends Command
{
    protected function execute()
    {
        // 处理业务逻辑
        $this->setQueueProgress('处理中...', 50);
        // ...
        $this->setQueueSuccess('完成');
    }
}

流程5:更新状态

// 5. 任务完成后,自动更新数据库状态
// status: 3 (成功) 或 4 (失败)

流程6:前端显示

// 6. 前端通过任务编号查询进度
$.loadQueue('Q20241110123456');
// 显示进度条和状态

⚠️ 重要提示

单进程限制: 为避免重复执行任务,请确保只启动一个监听进程 LISTEN

推荐启动方式: 使用 START 指令启动进程,异常时会自动重启 LISTEN 进程。

Linux 环境: 任务主进程会定期执行 ps ax 命令检查进程状态,某些安全工具可能发出警告,可放心忽略。

PHP 函数要求: 需要启用 execproc_openposix_kill 等函数。

🔧 守护进程管理

进程监控

  • 定时检查: 设置定时任务以定期检查监听进程 LISTEN 的状态
  • 自动重启: 如果发现 LISTEN 进程未运行或异常时,则自动执行指令 START 来重新启动它
  • 状态监控: 实时监控进程运行状态和健康度
  • 异常处理: 自动处理进程异常和重启

用户权限

  • 指定用户: 在 Linux 服务器上运行时,建议使用指定用户执行
  • 权限避免: 避免 CLI、CGI 和 FPM 这三种运行方式的缓存文件权限冲突
  • 日志检查: 在排查问题时,请检查相关日志
  • 用户命令: 建议使用 sudo -u www php think xadmin:queue start 命令

环境要求

  • exec 函数: 任务进程的创建是通过 PHP 的 exec 函数实现的
  • 函数启用: 请确保 exec 函数未被禁用
  • 异常处理: 在执行过程中遇到异常时,请根据错误提示调整 PHP 的运行环境设置
  • 权限配置: 确保 PHP 有足够的权限执行系统命令

进程管理相关指令

  • 执行 php think xadmin:queue stop # 停止所有正在执行的异步任务进程
  • 执行 php think xadmin:queue query # 查询当前所有正在执行的任务进程
  • 执行 php think xadmin:queue start # 开启异步任务守护进程(后台进程)
  • 执行 php think xadmin:queue listen # 启动异步任务监听进程(前台进程)
  • 执行 php think xadmin:queue status # 查看当前守护进程的后台运行状态

➕ 创建任务

ThinkAdmin 提供了两种方式创建异步任务:函数方式和控制器方式。

🎯 控制器方式

在控制器中使用 $this->_queue() 方法创建任务,这是最常用的方式:

<?php
declare(strict_types=1);

namespace app\admin\controller;

use think\admin\Controller;

class Member extends Controller
{
    /**
     * 同步会员数据
     */
    public function sync()
    {
        // 创建异步任务
        $this->_queue(
            '同步会员数据',           // 参数1:任务标题
            'member:sync',            // 参数2:执行命令
            0,                        // 参数3:延时时间(秒)
            ['type' => 'all'],        // 参数4:任务数据
            0,                        // 参数5:允许重复创建(0=不允许,1=允许)
            0                         // 参数6:是否循环执行(0=不循环,1=循环)
        );
    }
}

参数说明

参数说明示例
$title任务标题'同步会员数据'
$command执行命令'member:sync''app\store\queue\MemberSync'
$later延时时间(秒)0 表示立即执行,60 表示 60 秒后执行
$data任务数据(数组)['type' => 'all', 'ids' => [1,2,3]]
$rscript允许重复创建0 不允许,1 允许
$loops是否循环执行0 不循环,1 循环

返回值

  • 成功:自动返回 success 响应,data 字段包含任务编号(如 Q20241110123456
  • 失败:自动返回 error 响应

🎯 函数方式

使用 sysqueue() 函数创建任务,可以获取任务编号:

<?php
// 创建任务并获取任务编号
$code = sysqueue(
    '同步会员数据',           // 任务标题
    'member:sync',            // 执行命令
    0,                        // 延时时间
    ['type' => 'all'],        // 任务数据
    1,                        // 允许重复创建
    0                         // 是否循环执行
);

// $code 是任务编号,如 'Q20241110123456'
echo "任务编号:{$code}";

使用场景

  • 需要在代码中获取任务编号
  • 需要在非控制器环境中创建任务
  • 需要手动处理返回值

⏰ 延时执行

可以设置任务延时执行,适用于定时任务场景:

// 立即执行
$this->_queue('任务1', 'command:name', 0, []);

// 60 秒后执行
$this->_queue('任务2', 'command:name', 60, []);

// 1 小时后执行(3600 秒)
$this->_queue('任务3', 'command:name', 3600, []);

实际应用

  • 订单支付后,30 分钟后检查是否发货
  • 用户注册后,5 分钟后发送欢迎邮件
  • 数据备份,每天凌晨 2 点执行

🔄 防止重复创建

某些任务不应该重复创建,可以通过 $rscript 参数控制:

// 不允许重复创建(推荐)
$this->_queue('同步会员数据', 'member:sync', 0, [], 0);
// 如果任务已存在(待处理或处理中),则不会创建新任务

// 允许重复创建
$this->_queue('发送邮件', 'email:send', 0, [], 1);
// 即使任务已存在,也会创建新任务

判断规则

  • 系统根据任务标题判断是否重复
  • 如果标题相同且状态为"待处理"或"处理中",则视为重复
  • 建议在标题中加入唯一标识,如:"同步会员数据-{$userId}"

🔁 循环执行

某些任务需要定期执行,可以设置循环执行:

// 循环执行(每 60 秒执行一次)
$this->_queue('定时检查', 'check:status', 0, [], 0, 1);

注意事项

  • 循环任务会定期自动重新创建
  • 需要确保任务执行时间小于循环间隔
  • 建议在任务中控制循环次数,避免无限循环

📦 任务数据传递

可以通过 $data 参数传递数据给任务:

// 传递数据
$this->_queue('处理订单', 'order:process', 0, [
    'order_id' => 12345,
    'user_id' => 67890,
    'action' => 'refund'
]);

// 在任务中获取数据
class OrderProcess extends Command
{
    protected function execute(Input $input, Output $output)
    {
        // 获取任务数据
        $data = $this->queue->data;
        $orderId = $data['order_id'];
        $userId = $data['user_id'];
        $action = $data['action'];
        
        // 处理业务逻辑
        // ...
    }
}

🎯 创建子任务

如果要在任务中创建新任务,需要创建新的任务对象:

<?php
class ParentTask extends Command
{
    protected function execute(Input $input, Output $output)
    {
        // 处理主任务
        $this->setQueueProgress('处理中...', 50);
        
        // 创建子任务(注意:需要创建新对象)
        $queue = QueueService::instance([], true);  // 创建新任务服务
        $queue->register('子任务', 'child:task', 0, [], 0, 0);
        
        // 继续处理主任务
        $this->setQueueSuccess('完成');
    }
}

为什么需要创建新对象?

  • 避免替换当前任务对象
  • 确保主任务和子任务互不影响
  • 保持任务数据的独立性

📊 获取任务数据

在任务执行过程中,可以获取当前任务的数据:

class MyTask extends Command
{
    protected function execute(Input $input, Output $output)
    {
        // 获取任务编号
        $code = $this->queue->code;
        // 输出:Q20241110123456
        
        // 获取任务参数
        $data = $this->queue->data;
        // 输出:['type' => 'all', 'ids' => [1,2,3]]
        
        // 获取任务记录
        $record = $this->queue->record;
        // 输出:完整的任务记录对象
        
        // 使用数据
        $type = $data['type'];
        $ids = $data['ids'];
    }
}

🎨 两种任务类型

ThinkAdmin 支持两种任务类型:

类型1:Command 指令(推荐)

继承 think\admin\Command 类,使用 ThinkPHP 的命令机制:

class MemberSync extends Command
{
    protected function configure()
    {
        $this->setName('member:sync')
             ->setDescription('同步会员数据');
    }
    
    protected function execute(Input $input, Output $output)
    {
        // 处理逻辑
    }
}

类型2:Queue 类

继承 think\admin\Queue 类,直接实现任务逻辑:

class MemberSyncQueue extends Queue
{
    public function execute(array $data)
    {
        // 处理逻辑
    }
}

创建任务时

// 使用 Command 指令
$this->_queue('同步会员', 'member:sync', 0, []);

// 使用 Queue 类
$this->_queue('同步会员', 'app\store\queue\MemberSyncQueue', 0, []);

📊 任务进度

ThinkAdmin 提供了完整的任务进度跟踪功能,前端可以实时显示任务执行进度。

🎯 任务编号

创建任务后,系统会自动生成任务编号:

// 创建任务
$this->_queue('同步数据', 'data:sync', 0, []);

// 返回的任务编号格式:Q + 时间戳 + 随机数
// 示例:Q20241110123456

任务编号格式

  • 以字母 Q 开头
  • 后面是时间戳和随机数
  • 用于唯一标识任务

📡 前端显示进度

前端可以使用 $.loadQueue() 方法显示任务进度:

<!-- 方式1:使用 data-queue 属性(推荐) -->
<button data-queue="{:url('sync')}" class="layui-btn">
    同步数据
</button>

<!-- 方式2:手动调用 -->
<button onclick="startSync()" class="layui-btn">
    同步数据
</button>

<script>
function startSync() {
    // 1. 调用后端接口创建任务
    $.post('{:url("sync")}', {}, function(result) {
        if (result.code === 1) {
            // 2. 获取任务编号
            var queueCode = result.data;
            
            // 3. 显示进度弹窗
            $.loadQueue(queueCode);
        }
    });
}
</script>

进度弹窗功能

  • 实时显示任务进度(百分比)
  • 显示任务状态信息
  • 任务完成后自动关闭
  • 任务失败时显示错误信息

📈 后端更新进度

在任务执行过程中,可以更新任务进度:

class DataSync extends Command
{
    protected function execute(Input $input, Output $output)
    {
        $total = 1000;
        $count = 0;
        
        // 处理数据
        foreach ($data as $item) {
            $count++;
            
            // 更新进度(百分比)
            $progress = intval($count / $total * 100);
            $this->setQueueProgress(
                "处理中:{$count}/{$total}",  // 提示信息
                $progress                     // 进度百分比(0-100)
            );
            
            // 处理业务逻辑
            processItem($item);
        }
        
        // 设置任务完成
        $this->setQueueSuccess("完成 {$count} 条数据处理");
    }
}

进度更新方法

// 更新进度(继续执行)
$this->setQueueProgress('处理中...', 50);  // 50% 完成

// 设置任务成功(结束执行)
$this->setQueueSuccess('任务完成');

// 设置任务失败(结束执行)
$this->setQueueError('任务失败:错误信息');

参数说明

  • setQueueProgress($message, $progress): 更新进度,任务继续执行
    • $message: 提示信息(字符串)
    • $progress: 进度百分比(0-100 的整数)
  • setQueueSuccess($message): 设置任务成功,任务结束执行
    • $message: 成功提示信息(可选)
  • setQueueError($message): 设置任务失败,任务结束执行
    • $message: 错误提示信息(必填)

🔄 完整示例

前端代码

<!-- 触发任务的按钮 -->
<button data-queue="{:url('member/sync')}" class="layui-btn">
    <i class="layui-icon layui-icon-refresh"></i> 同步会员数据
</button>

后端代码

<?php
namespace app\admin\controller;

use think\admin\Controller;

class Member extends Controller
{
    public function sync()
    {
        // 创建任务,自动返回任务编号
        $this->_queue('同步会员数据', 'member:sync', 0, []);
    }
}

任务代码

<?php
namespace app\store\command;

use think\admin\Command;

class MemberSync extends Command
{
    protected function configure()
    {
        $this->setName('member:sync')
             ->setDescription('同步会员数据');
    }
    
    protected function execute(Input $input, Output $output)
    {
        $total = 1000;
        $count = 0;
        
        // 分批处理
        Db::name('StoreMember')->chunk(100, function($members) use (&$count, $total) {
            foreach ($members as $member) {
                $count++;
                
                // 更新进度
                $progress = intval($count / $total * 100);
                $this->setQueueProgress(
                    "处理会员:{$member['id']} ({$count}/{$total})",
                    $progress
                );
                
                // 处理业务逻辑
                MemberService::sync($member['id']);
            }
        });
        
        // 任务完成
        $this->setQueueSuccess("完成 {$count} 个会员的同步");
    }
}

执行效果

  1. 用户点击按钮
  2. 前端自动调用后端接口
  3. 后端创建任务并返回任务编号
  4. 前端显示进度弹窗
  5. 后端任务执行,实时更新进度
  6. 任务完成后,前端显示完成提示

📝 任务案例

前端代码(属性 data-queue 触发 $.loadQueue(Code) 展示任务进度)

<button data-queue="{:url('sync')}" class='layui-btn layui-btn-sm layui-btn-primary'>
    <i class="layui-icon layui-icon-refresh"></i> 刷新会员数据
</button>

说明: 点击按钮后,会自动调用后端接口创建任务,并显示任务进度弹窗。

后端代码(调用$this->_queue()会返回响应对象,响应对象data默认为任务编号)

<?php
declare(strict_types=1);

namespace app\admin\controller;

use think\admin\Controller;

/**
 * 会员管理
 * @class Member
 * @package app\admin\controller
 */
class Member extends Controller
{
    /**
     * 刷新会员数据
     * @auth true
     */
    public function sync()
    {
        // 创建异步任务
        // 参数1: 任务标题
        // 参数2: 执行命令(指令名称)
        // 参数3: 延时时间(秒)
        // 参数4: 任务数据(数组)
        // 参数5: 允许重复创建(0=不允许,1=允许)
        $this->_queue('重新计算所有会员级别', "xsync:member", 1, [], 0);
    }
}

对应指令(指令需要注册,通过 sys.php 注册或 Service.php配置,执行 php think 可查看是否已生效)

<?php
// app/admin/sys.php 动态注册操作指令
use think\Console;

\think\Console::starting(function (Console $console) {
    $console->addCommand(\app\store\command\MemberRun::class);
});

在任务处理时可以使用 $this->queue 获取到数据参数(实际对象为 QueueService ,具体可以查阅对象代码)

<?php
declare(strict_types=1);

namespace app\store\command;

use think\admin\Command;
use think\admin\Command\Input;
use think\admin\Command\Output;
use think\Collection;

/**
 * 重新计算会员数据
 * @class MemberRun
 * @package app\store\command
 */
class MemberRun extends Command
{
    /**
     * 配置命令
     */
    protected function configure()
    {
        $this->setName('xsync:member')
             ->setDescription('[ 商城 - 不需执行 ] 重新计算所有会员的等级');
    }

    /**
     * 执行命令
     * @param Input $input
     * @param Output $output
     * @throws \think\admin\Exception
     */
    protected function execute(Input $input, Output $output)
    {
        // 获取总数
        $total = $this->app->db->name('StoreMember')->count();
        $count = 0;
        
        // 分批处理数据
        $this->app->db->name('StoreMember')->chunk(100, function (Collection $data) use (&$count, $total) {
            foreach ($data->toArray() as $vo) {
                $count++;
                // 同步会员等级
                $state = MemberService::instance()->syncLevel($vo['id']) ? '成功' : '失败';
                
                // 更新任务进度(百分比)
                $this->setQueueProgress(
                    "调整会员 {$vo['id']} {$vo['phone']} {$vo['nickname']} 级别{$state}", 
                    intval($count / $total * 100)
                );
            }
        });
        
        // 设置任务完成
        $this->setQueueSuccess("重新计算 {$count} 个会员的级别!");
    }
}

📖 Command 类详解

继承要求:

  • 命令类必须继承 think\admin\Command
  • 需要实现 configure() 方法配置命令
  • 需要实现 execute() 方法执行命令逻辑

常用方法:

// 1. 配置命令名称和描述
protected function configure()
{
    $this->setName('xsync:member')
         ->setDescription('重新计算所有会员的等级');
}

// 2. 获取命令参数
$openid = $input->getArgument('openid');
$code = $input->getArgument('autocode');

// 3. 设置任务进度(百分比,0-100)
$this->setQueueProgress(string $message, int $progress = 0);

// 4. 设置任务成功
$this->setQueueSuccess(string $message = '');

// 5. 设置任务失败
$this->setQueueError(string $message = '');

// 6. 获取当前任务数据(在任务中)
$this->queue->code;    // 任务编号
$this->queue->data;    // 任务参数
$this->queue->record;  // 任务记录

完整示例:

<?php
declare(strict_types=1);

namespace app\wechat\command;

use think\admin\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;

class Auto extends Command
{
    private $openid;

    protected function configure()
    {
        $this->setName('xadmin:fansmsg');
        $this->addArgument('openid', Argument::OPTIONAL, 'wechat user openid', '');
        $this->addArgument('autocode', Argument::OPTIONAL, 'wechat auto message', '');
        $this->setDescription('Wechat Users Push AutoMessage for ThinkAdmin');
    }

    protected function execute(Input $input, Output $output)
    {
        $code = $input->getArgument('autocode');
        $this->openid = $input->getArgument('openid');
        
        // 参数验证
        if (empty($code)) {
            $this->setQueueError('Message Code cannot be empty');
            return;
        }
        if (empty($this->openid)) {
            $this->setQueueError('Wechat Openid cannot be empty');
            return;
        }

        // 查询数据
        $map = ['code' => $code, 'status' => 1];
        $data = WechatAuto::mk()->where($map)->find();
        if (empty($data)) {
            $this->setQueueError('Message Data Query failed');
            return;
        }

        // 处理业务逻辑
        $this->buildMessage($data->toArray());
    }

    private function buildMessage(array $data)
    {
        // 处理消息逻辑
        $result = $this->sendMessage('text', ['content' => $data['content']]);
        
        if (empty($result[0])) {
            $this->setQueueError($result[1]);
        } else {
            $this->setQueueSuccess($result[1]);
        }
    }
}

注意事项

  • 命令类需要在 Service 类的 register() 方法中注册
  • 使用 setQueueError()setQueueSuccess() 会结束任务执行
  • 任务进度使用百分比(0-100),用于前端显示
  • 可以通过 $this->queue 访问当前任务数据

注册命令

<?php
// app/admin/Service.php
namespace app\admin;

use think\admin\Plugin;

class Service extends Plugin
{
    public function register(): void
    {
        // 注册命令
        $this->commands([
            \app\store\command\MemberSync::class,
        ]);
    }
}

验证命令是否注册

# 执行以下命令,查看是否包含你的命令
php think

# 输出示例:
# member:sync    同步会员数据

使用 Queue 类处理任务

除了使用 Command 指令,还可以直接继承 Queue 类:

<?php
declare(strict_types=1);

namespace app\store\queue;

use think\admin\Queue;

/**
 * 会员等级同步任务
 * @class MemberSyncQueue
 * @package app\store\queue
 */
class MemberSyncQueue extends Queue
{
    /**
     * 执行任务
     * @param array $data 任务数据
     * @throws \think\admin\Exception
     */
    public function execute(array $data)
    {
        $total = $this->app->db->name('StoreMember')->count();
        $count = 0;
        
        $this->app->db->name('StoreMember')->chunk(100, function ($members) use (&$count, $total) {
            foreach ($members as $member) {
                $count++;
                MemberService::instance()->syncLevel($member['id']);
                
                // 更新进度
                $this->setQueueProgress(
                    "处理会员 {$member['id']}", 
                    intval($count / $total * 100)
                );
            }
        });
        
        $this->setQueueSuccess("完成 {$count} 个会员的等级同步");
    }
}

创建任务时使用类名

// 在控制器中
$this->_queue('同步会员等级', 'app\store\queue\MemberSyncQueue', 0, ['type' => 'level']);

❓ 常见问题

❓ 任务不执行

可能原因

  1. 监听进程未启动
  2. 命令未注册
  3. PHP 函数被禁用

解决方法

# 1. 检查监听进程是否运行
php think xadmin:queue status

# 2. 启动监听进程
php think xadmin:queue start

# 3. 检查命令是否注册
php think
# 查看是否包含你的命令

# 4. 检查 PHP 函数是否启用
php -m | grep exec
# 或
php -r "echo function_exists('exec') ? 'OK' : 'FAIL';"

❓ 任务失败

可能原因

  1. 命令类不存在
  2. 命令类语法错误
  3. 数据库连接失败

解决方法

// 1. 检查命令类是否存在
// 确保类文件路径正确
// 确保命名空间正确

// 2. 检查命令类语法
// 确保继承正确的类
// 确保实现了必要的方法

// 3. 查看任务日志
// 在任务中使用 try-catch 捕获异常
try {
    // 任务逻辑
} catch (\Exception $e) {
    $this->setQueueError('任务失败:' . $e->getMessage());
}

❓ 查看日志

方法1:查看数据库

-- 查看任务列表
SELECT * FROM system_queue ORDER BY create_at DESC LIMIT 10;

-- 查看失败的任务
SELECT * FROM system_queue WHERE status = 4;

方法2:查看系统日志

# 查看 ThinkAdmin 日志
tail -f runtime/log/think.log

# 查看 PHP 错误日志
tail -f /var/log/php_errors.log

❓ 停止任务

方法1:使用命令停止

# 停止所有任务
php think xadmin:queue stop

方法2:手动停止

# 查询正在执行的任务
php think xadmin:queue query

# 手动杀死进程(Linux)
kill -9 <进程ID>

❓ 执行超时

优化建议

  1. 分批处理:将大任务拆分成多个小任务
// ❌ 不推荐:一次性处理所有数据
foreach ($allData as $item) {
    processItem($item);
}

// ✅ 推荐:分批处理
Db::name('Table')->chunk(100, function($data) {
    foreach ($data as $item) {
        processItem($item);
    }
});
  1. 设置超时时间:在任务中设置最大执行时间
$startTime = time();
$maxTime = 300;  // 最大执行 5 分钟

foreach ($data as $item) {
    if (time() - $startTime > $maxTime) {
        $this->setQueueError('任务执行超时');
        return;
    }
    processItem($item);
}
  1. 拆分任务:将大任务拆分成多个小任务
// 创建多个小任务
foreach ($batches as $batch) {
    $this->_queue("处理批次-{$batch['id']}", 'data:process', 0, [
        'batch_id' => $batch['id']
    ]);
}

❓ 任务重试

方法1:在任务中实现重试逻辑

class RetryTask extends Command
{
    protected function execute(Input $input, Output $output)
    {
        $maxRetries = 3;
        $retry = 0;
        
        while ($retry < $maxRetries) {
            try {
                // 执行任务
                $this->doWork();
                $this->setQueueSuccess('完成');
                return;
            } catch (\Exception $e) {
                $retry++;
                if ($retry >= $maxRetries) {
                    $this->setQueueError('重试失败:' . $e->getMessage());
                    return;
                }
                sleep(5);  // 等待 5 秒后重试
            }
        }
    }
}

方法2:创建新任务重试

class TaskWithRetry extends Command
{
    protected function execute(Input $input, Output $output)
    {
        try {
            // 执行任务
            $this->doWork();
            $this->setQueueSuccess('完成');
        } catch (\Exception $e) {
            // 失败后创建新任务重试
            $retryCount = $this->queue->data['retry'] ?? 0;
            if ($retryCount < 3) {
                $queue = QueueService::instance([], true);
                $queue->register(
                    '重试任务',
                    'task:retry',
                    60,  // 60 秒后重试
                    ['retry' => $retryCount + 1],
                    0,
                    0
                );
            } else {
                $this->setQueueError('重试次数超限');
            }
        }
    }
}
最近更新:
Contributors: 邹景立, Anyon