⚡ 异步任务
ThinkAdmin 的异步任务处理是其核心特性之一,支持在 Windows 和 Linux 平台上并行执行多个任务,显著提高了任务处理效率。
📚 基础概念
🤔 基本介绍
异步任务是指不需要立即返回结果的操作,可以在后台慢慢处理。
简单理解:就像餐厅点餐,同步任务是"现做现吃"(用户要等待),异步任务是"下单后先给你号牌,菜好了再通知你"(用户无需等待)。
实际场景:
- 发送邮件:发送 1000 封邮件可能需要 10 分钟,如果同步执行,用户要等 10 分钟
- 生成报表:处理大量数据生成报表可能需要 5 分钟,如果同步执行,用户要等 5 分钟
- 图片处理:压缩 100 张图片可能需要 3 分钟,如果同步执行,用户要等 3 分钟
使用异步任务后:
- 用户提交请求后立即返回(1 秒内)
- 后台慢慢处理任务(可能需要几分钟)
- 处理完成后通知用户(通过进度条或消息)
🤔 使用优势
问题1:用户体验差
如果使用同步任务,用户需要等待很长时间:
// ❌ 不推荐:同步处理(用户要等待)
public function sendEmails()
{
$users = User::select(); // 1000 个用户
foreach ($users as $user) {
// 发送邮件,每个需要 1 秒
Mail::send($user->email, '通知');
}
// 用户需要等待 1000 秒(约 17 分钟)!
$this->success('发送完成');
}使用异步任务后:
// ✅ 推荐:异步处理(用户立即返回)
public function sendEmails()
{
// 创建异步任务,立即返回
$this->_queue('批量发送邮件', 'email:send', 0, ['user_ids' => $ids]);
// 用户立即看到"任务已创建"的提示,无需等待
$this->success('任务已创建,正在后台处理');
}问题2:系统性能
同步任务会长时间占用 Web 进程,影响其他用户访问:
// ❌ 不推荐:占用 Web 进程
public function processData()
{
// 处理大量数据,需要 5 分钟
// 这 5 分钟内,这个 Web 进程被占用,无法处理其他请求
processLargeData();
}使用异步任务后:
// ✅ 推荐:独立进程处理
public function processData()
{
// 创建异步任务,Web 进程立即释放
$this->_queue('处理数据', 'data:process', 0, []);
// Web 进程可以立即处理其他用户的请求
}🔄 同步 vs 异步
同步任务流程:
用户提交请求
↓
服务器处理(用户等待中...)
↓
处理完成(可能需要几分钟)
↓
返回结果给用户异步任务流程:
用户提交请求
↓
立即返回"任务已创建"
↓
后台独立进程处理(用户无需等待)
↓
处理完成后更新状态
↓
用户可以通过进度条查看进度⚙️ 工作原理
ThinkAdmin 使用队列机制处理异步任务,整个过程是自动的:
步骤1:创建任务
将任务信息保存到数据库:
// 在控制器中创建任务
$this->_queue('任务标题', 'command:name', 0, ['param' => 'value']);系统自动执行:
// 系统自动将任务保存到数据库
// 表名:system_queue
// 字段:title(标题)、command(命令)、data(数据)、status(状态)等步骤2:监听进程扫描
后台监听进程每 0.5 秒扫描一次数据库:
# 监听进程自动执行(无需手动操作)
php think xadmin:queue listen扫描逻辑(系统自动执行):
// 每 0.5 秒执行一次
while (true) {
// 查询待处理的任务(status = 1)
$tasks = Queue::where('status', 1)->select();
foreach ($tasks as $task) {
// 创建独立进程执行任务
exec("php think {$task->command} > /dev/null 2>&1 &");
}
sleep(0.5); // 等待 0.5 秒后继续扫描
}步骤3:执行任务
系统创建独立的 PHP-CLI 进程执行任务:
# 系统自动创建进程(无需手动操作)
php think command:name --data='{"param":"value"}'步骤4:更新状态
任务执行完成后,自动更新状态:
// 在任务中调用(系统自动处理)
$this->setQueueSuccess('任务完成'); // 更新状态为成功
// 或
$this->setQueueError('任务失败'); // 更新状态为失败📖 相关概念
队列(Queue)
- 待处理任务的列表
- 存储在数据库
system_queue表中 - 每个任务有唯一编号(以
Q开头)
进程(Process)
- 执行任务的独立程序
- 与 Web 进程分离,互不影响
- Windows 使用
wmic创建,Linux 使用&创建
监听(Listen)
- 定期检查是否有新任务
- 默认每 0.5 秒扫描一次
- 发现任务后自动创建进程执行
命令(Command)
- 执行任务的具体代码
- 继承
think\admin\Command类 - 或继承
think\admin\Queue类
🎯 使用场景
场景1:批量发送邮件
// 需要发送 1000 封邮件,每封需要 1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢发送
$this->_queue('批量发送邮件', 'email:batch', 0, ['user_ids' => $ids]);场景2:处理大量数据
// 需要处理 10000 条数据,每条需要 0.1 秒
// 同步:用户等待 1000 秒(约 17 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('处理数据', 'data:process', 0, ['data' => $data]);场景3:生成报表
// 需要生成复杂的 Excel 报表,需要 5 分钟
// 同步:用户等待 5 分钟
// 异步:用户立即返回,生成完成后通知下载
$this->_queue('生成报表', 'report:generate', 0, ['params' => $params]);场景4:图片处理
// 需要压缩 100 张图片,每张需要 2 秒
// 同步:用户等待 200 秒(约 3 分钟)
// 异步:用户立即返回,后台慢慢处理
$this->_queue('压缩图片', 'image:compress', 0, ['images' => $images]);🚀 主要特性
- 并行处理: 支持多个任务同时执行
- 跨平台: 兼容 Windows 和 Linux 系统
- 自动监控: 自动扫描和执行待处理任务
- 进程管理: 独立的 PHP-CLI 进程处理任务
- 异常恢复: 支持自动重启和异常处理
⚙️ 工作原理
ThinkAdmin 的异步任务机制基于队列和进程管理,整个过程是自动化的。
🔍 任务扫描机制
监听进程定期扫描数据库,查找待处理的任务:
扫描频率:
- 默认每 0.5 秒 扫描一次
- 可以通过配置调整扫描频率
- 扫描不会影响系统性能(查询很快)
扫描过程(系统自动执行):
// 系统自动执行的逻辑(无需手动编写)
while (true) {
// 1. 查询待处理的任务(status = 1)
$tasks = Db::name('system_queue')
->where('status', 1)
->where('exec_at', '<=', time())
->select();
// 2. 为每个任务创建独立进程
foreach ($tasks as $task) {
// 更新任务状态为处理中
$task->status = 2;
$task->save();
// 创建独立进程执行任务
if (PHP_OS === 'WINNT') {
// Windows 系统使用 wmic
exec("wmic process call create \"php think {$task->command}\"");
} else {
// Linux 系统使用 &
exec("php think {$task->command} > /dev/null 2>&1 &");
}
}
// 3. 等待 0.5 秒后继续扫描
sleep(0.5);
}多任务并行:
- 系统支持同时执行多个任务
- 每个任务都是独立的进程
- 互不影响,可以并行处理
🎮 进程管理
ThinkAdmin 提供了完整的进程管理命令:
LISTEN - 监听进程
启动监听进程,负责扫描和执行任务:
# 启动监听进程(前台运行,可以看到日志)
php think xadmin:queue listen特点:
- 前台运行,可以看到实时日志
- 适合开发环境调试
- 进程异常退出后不会自动重启
START - 启动守护进程
启动守护进程,自动管理监听进程:
# 启动守护进程(推荐方式)
php think xadmin:queue start特点:
- 后台运行,自动管理监听进程
- 监听进程异常退出后会自动重启
- 适合生产环境使用
STOP - 停止进程
停止所有正在执行的任务进程:
# 停止所有任务进程
php think xadmin:queue stop使用场景:
- 系统维护时需要停止所有任务
- 任务执行异常需要强制停止
- 更新代码后需要重启进程
QUERY - 查询进程
查询当前正在执行的任务进程:
# 查询当前运行的任务
php think xadmin:queue query输出示例:
正在执行的任务:
- Q20241110123456: 批量发送邮件 (进度: 50%)
- Q20241110123457: 处理数据 (进度: 30%)STATUS - 查看状态
查看守护进程的运行状态:
# 查看守护进程状态
php think xadmin:queue status输出示例:
守护进程状态:运行中
监听进程状态:运行中
当前任务数:2🔄 完整工作流程
流程1:创建任务
// 1. 用户在控制器中创建任务
public function sync()
{
$this->_queue('同步数据', 'data:sync', 0, []);
}流程2:任务入库
// 2. 系统自动将任务保存到数据库
// system_queue 表
// - title: '同步数据'
// - command: 'data:sync'
// - status: 1 (待处理)
// - exec_at: 当前时间流程3:监听扫描
# 3. 监听进程扫描数据库(每 0.5 秒)
# 发现待处理任务,创建独立进程执行流程4:执行任务
// 4. 独立进程执行任务
class DataSync extends Command
{
protected function execute()
{
// 处理业务逻辑
$this->setQueueProgress('处理中...', 50);
// ...
$this->setQueueSuccess('完成');
}
}流程5:更新状态
// 5. 任务完成后,自动更新数据库状态
// status: 3 (成功) 或 4 (失败)流程6:前端显示
// 6. 前端通过任务编号查询进度
$.loadQueue('Q20241110123456');
// 显示进度条和状态⚠️ 重要提示
单进程限制: 为避免重复执行任务,请确保只启动一个监听进程 LISTEN。
推荐启动方式: 使用 START 指令启动进程,异常时会自动重启 LISTEN 进程。
Linux 环境: 任务主进程会定期执行
ps ax命令检查进程状态,某些安全工具可能发出警告,可放心忽略。
PHP 函数要求: 需要启用
exec、proc_open和posix_kill等函数。
🔧 守护进程管理
进程监控
- 定时检查: 设置定时任务以定期检查监听进程 LISTEN 的状态
- 自动重启: 如果发现 LISTEN 进程未运行或异常时,则自动执行指令 START 来重新启动它
- 状态监控: 实时监控进程运行状态和健康度
- 异常处理: 自动处理进程异常和重启
用户权限
- 指定用户: 在 Linux 服务器上运行时,建议使用指定用户执行
- 权限避免: 避免 CLI、CGI 和 FPM 这三种运行方式的缓存文件权限冲突
- 日志检查: 在排查问题时,请检查相关日志
- 用户命令: 建议使用
sudo -u www php think xadmin:queue start命令
环境要求
- exec 函数: 任务进程的创建是通过 PHP 的 exec 函数实现的
- 函数启用: 请确保 exec 函数未被禁用
- 异常处理: 在执行过程中遇到异常时,请根据错误提示调整 PHP 的运行环境设置
- 权限配置: 确保 PHP 有足够的权限执行系统命令
进程管理相关指令
- 执行
php think xadmin:queue stop# 停止所有正在执行的异步任务进程 - 执行
php think xadmin:queue query# 查询当前所有正在执行的任务进程 - 执行
php think xadmin:queue start# 开启异步任务守护进程(后台进程) - 执行
php think xadmin:queue listen# 启动异步任务监听进程(前台进程) - 执行
php think xadmin:queue status# 查看当前守护进程的后台运行状态
➕ 创建任务
ThinkAdmin 提供了两种方式创建异步任务:函数方式和控制器方式。
🎯 控制器方式
在控制器中使用 $this->_queue() 方法创建任务,这是最常用的方式:
<?php
declare(strict_types=1);
namespace app\admin\controller;
use think\admin\Controller;
class Member extends Controller
{
/**
* 同步会员数据
*/
public function sync()
{
// 创建异步任务
$this->_queue(
'同步会员数据', // 参数1:任务标题
'member:sync', // 参数2:执行命令
0, // 参数3:延时时间(秒)
['type' => 'all'], // 参数4:任务数据
0, // 参数5:允许重复创建(0=不允许,1=允许)
0 // 参数6:是否循环执行(0=不循环,1=循环)
);
}
}参数说明:
| 参数 | 说明 | 示例 |
|---|---|---|
$title | 任务标题 | '同步会员数据' |
$command | 执行命令 | 'member:sync' 或 'app\store\queue\MemberSync' |
$later | 延时时间(秒) | 0 表示立即执行,60 表示 60 秒后执行 |
$data | 任务数据(数组) | ['type' => 'all', 'ids' => [1,2,3]] |
$rscript | 允许重复创建 | 0 不允许,1 允许 |
$loops | 是否循环执行 | 0 不循环,1 循环 |
返回值:
- 成功:自动返回
success响应,data字段包含任务编号(如Q20241110123456) - 失败:自动返回
error响应
🎯 函数方式
使用 sysqueue() 函数创建任务,可以获取任务编号:
<?php
// 创建任务并获取任务编号
$code = sysqueue(
'同步会员数据', // 任务标题
'member:sync', // 执行命令
0, // 延时时间
['type' => 'all'], // 任务数据
1, // 允许重复创建
0 // 是否循环执行
);
// $code 是任务编号,如 'Q20241110123456'
echo "任务编号:{$code}";使用场景:
- 需要在代码中获取任务编号
- 需要在非控制器环境中创建任务
- 需要手动处理返回值
⏰ 延时执行
可以设置任务延时执行,适用于定时任务场景:
// 立即执行
$this->_queue('任务1', 'command:name', 0, []);
// 60 秒后执行
$this->_queue('任务2', 'command:name', 60, []);
// 1 小时后执行(3600 秒)
$this->_queue('任务3', 'command:name', 3600, []);实际应用:
- 订单支付后,30 分钟后检查是否发货
- 用户注册后,5 分钟后发送欢迎邮件
- 数据备份,每天凌晨 2 点执行
🔄 防止重复创建
某些任务不应该重复创建,可以通过 $rscript 参数控制:
// 不允许重复创建(推荐)
$this->_queue('同步会员数据', 'member:sync', 0, [], 0);
// 如果任务已存在(待处理或处理中),则不会创建新任务
// 允许重复创建
$this->_queue('发送邮件', 'email:send', 0, [], 1);
// 即使任务已存在,也会创建新任务判断规则:
- 系统根据任务标题判断是否重复
- 如果标题相同且状态为"待处理"或"处理中",则视为重复
- 建议在标题中加入唯一标识,如:
"同步会员数据-{$userId}"
🔁 循环执行
某些任务需要定期执行,可以设置循环执行:
// 循环执行(每 60 秒执行一次)
$this->_queue('定时检查', 'check:status', 0, [], 0, 1);注意事项:
- 循环任务会定期自动重新创建
- 需要确保任务执行时间小于循环间隔
- 建议在任务中控制循环次数,避免无限循环
📦 任务数据传递
可以通过 $data 参数传递数据给任务:
// 传递数据
$this->_queue('处理订单', 'order:process', 0, [
'order_id' => 12345,
'user_id' => 67890,
'action' => 'refund'
]);
// 在任务中获取数据
class OrderProcess extends Command
{
protected function execute(Input $input, Output $output)
{
// 获取任务数据
$data = $this->queue->data;
$orderId = $data['order_id'];
$userId = $data['user_id'];
$action = $data['action'];
// 处理业务逻辑
// ...
}
}🎯 创建子任务
如果要在任务中创建新任务,需要创建新的任务对象:
<?php
class ParentTask extends Command
{
protected function execute(Input $input, Output $output)
{
// 处理主任务
$this->setQueueProgress('处理中...', 50);
// 创建子任务(注意:需要创建新对象)
$queue = QueueService::instance([], true); // 创建新任务服务
$queue->register('子任务', 'child:task', 0, [], 0, 0);
// 继续处理主任务
$this->setQueueSuccess('完成');
}
}为什么需要创建新对象?
- 避免替换当前任务对象
- 确保主任务和子任务互不影响
- 保持任务数据的独立性
📊 获取任务数据
在任务执行过程中,可以获取当前任务的数据:
class MyTask extends Command
{
protected function execute(Input $input, Output $output)
{
// 获取任务编号
$code = $this->queue->code;
// 输出:Q20241110123456
// 获取任务参数
$data = $this->queue->data;
// 输出:['type' => 'all', 'ids' => [1,2,3]]
// 获取任务记录
$record = $this->queue->record;
// 输出:完整的任务记录对象
// 使用数据
$type = $data['type'];
$ids = $data['ids'];
}
}🎨 两种任务类型
ThinkAdmin 支持两种任务类型:
类型1:Command 指令(推荐)
继承 think\admin\Command 类,使用 ThinkPHP 的命令机制:
class MemberSync extends Command
{
protected function configure()
{
$this->setName('member:sync')
->setDescription('同步会员数据');
}
protected function execute(Input $input, Output $output)
{
// 处理逻辑
}
}类型2:Queue 类
继承 think\admin\Queue 类,直接实现任务逻辑:
class MemberSyncQueue extends Queue
{
public function execute(array $data)
{
// 处理逻辑
}
}创建任务时:
// 使用 Command 指令
$this->_queue('同步会员', 'member:sync', 0, []);
// 使用 Queue 类
$this->_queue('同步会员', 'app\store\queue\MemberSyncQueue', 0, []);📊 任务进度
ThinkAdmin 提供了完整的任务进度跟踪功能,前端可以实时显示任务执行进度。
🎯 任务编号
创建任务后,系统会自动生成任务编号:
// 创建任务
$this->_queue('同步数据', 'data:sync', 0, []);
// 返回的任务编号格式:Q + 时间戳 + 随机数
// 示例:Q20241110123456任务编号格式:
- 以字母
Q开头 - 后面是时间戳和随机数
- 用于唯一标识任务
📡 前端显示进度
前端可以使用 $.loadQueue() 方法显示任务进度:
<!-- 方式1:使用 data-queue 属性(推荐) -->
<button data-queue="{:url('sync')}" class="layui-btn">
同步数据
</button>
<!-- 方式2:手动调用 -->
<button onclick="startSync()" class="layui-btn">
同步数据
</button>
<script>
function startSync() {
// 1. 调用后端接口创建任务
$.post('{:url("sync")}', {}, function(result) {
if (result.code === 1) {
// 2. 获取任务编号
var queueCode = result.data;
// 3. 显示进度弹窗
$.loadQueue(queueCode);
}
});
}
</script>进度弹窗功能:
- 实时显示任务进度(百分比)
- 显示任务状态信息
- 任务完成后自动关闭
- 任务失败时显示错误信息
📈 后端更新进度
在任务执行过程中,可以更新任务进度:
class DataSync extends Command
{
protected function execute(Input $input, Output $output)
{
$total = 1000;
$count = 0;
// 处理数据
foreach ($data as $item) {
$count++;
// 更新进度(百分比)
$progress = intval($count / $total * 100);
$this->setQueueProgress(
"处理中:{$count}/{$total}", // 提示信息
$progress // 进度百分比(0-100)
);
// 处理业务逻辑
processItem($item);
}
// 设置任务完成
$this->setQueueSuccess("完成 {$count} 条数据处理");
}
}进度更新方法:
// 更新进度(继续执行)
$this->setQueueProgress('处理中...', 50); // 50% 完成
// 设置任务成功(结束执行)
$this->setQueueSuccess('任务完成');
// 设置任务失败(结束执行)
$this->setQueueError('任务失败:错误信息');参数说明:
setQueueProgress($message, $progress): 更新进度,任务继续执行$message: 提示信息(字符串)$progress: 进度百分比(0-100 的整数)
setQueueSuccess($message): 设置任务成功,任务结束执行$message: 成功提示信息(可选)
setQueueError($message): 设置任务失败,任务结束执行$message: 错误提示信息(必填)
🔄 完整示例
前端代码:
<!-- 触发任务的按钮 -->
<button data-queue="{:url('member/sync')}" class="layui-btn">
<i class="layui-icon layui-icon-refresh"></i> 同步会员数据
</button>后端代码:
<?php
namespace app\admin\controller;
use think\admin\Controller;
class Member extends Controller
{
public function sync()
{
// 创建任务,自动返回任务编号
$this->_queue('同步会员数据', 'member:sync', 0, []);
}
}任务代码:
<?php
namespace app\store\command;
use think\admin\Command;
class MemberSync extends Command
{
protected function configure()
{
$this->setName('member:sync')
->setDescription('同步会员数据');
}
protected function execute(Input $input, Output $output)
{
$total = 1000;
$count = 0;
// 分批处理
Db::name('StoreMember')->chunk(100, function($members) use (&$count, $total) {
foreach ($members as $member) {
$count++;
// 更新进度
$progress = intval($count / $total * 100);
$this->setQueueProgress(
"处理会员:{$member['id']} ({$count}/{$total})",
$progress
);
// 处理业务逻辑
MemberService::sync($member['id']);
}
});
// 任务完成
$this->setQueueSuccess("完成 {$count} 个会员的同步");
}
}执行效果:
- 用户点击按钮
- 前端自动调用后端接口
- 后端创建任务并返回任务编号
- 前端显示进度弹窗
- 后端任务执行,实时更新进度
- 任务完成后,前端显示完成提示
📝 任务案例
前端代码(属性
data-queue触发$.loadQueue(Code)展示任务进度)
<button data-queue="{:url('sync')}" class='layui-btn layui-btn-sm layui-btn-primary'>
<i class="layui-icon layui-icon-refresh"></i> 刷新会员数据
</button>说明: 点击按钮后,会自动调用后端接口创建任务,并显示任务进度弹窗。
后端代码(调用
$this->_queue()会返回响应对象,响应对象data默认为任务编号)
<?php
declare(strict_types=1);
namespace app\admin\controller;
use think\admin\Controller;
/**
* 会员管理
* @class Member
* @package app\admin\controller
*/
class Member extends Controller
{
/**
* 刷新会员数据
* @auth true
*/
public function sync()
{
// 创建异步任务
// 参数1: 任务标题
// 参数2: 执行命令(指令名称)
// 参数3: 延时时间(秒)
// 参数4: 任务数据(数组)
// 参数5: 允许重复创建(0=不允许,1=允许)
$this->_queue('重新计算所有会员级别', "xsync:member", 1, [], 0);
}
}对应指令(指令需要注册,通过
sys.php注册或Service.php配置,执行php think可查看是否已生效)
<?php
// app/admin/sys.php 动态注册操作指令
use think\Console;
\think\Console::starting(function (Console $console) {
$console->addCommand(\app\store\command\MemberRun::class);
});在任务处理时可以使用
$this->queue获取到数据参数(实际对象为QueueService,具体可以查阅对象代码)
<?php
declare(strict_types=1);
namespace app\store\command;
use think\admin\Command;
use think\admin\Command\Input;
use think\admin\Command\Output;
use think\Collection;
/**
* 重新计算会员数据
* @class MemberRun
* @package app\store\command
*/
class MemberRun extends Command
{
/**
* 配置命令
*/
protected function configure()
{
$this->setName('xsync:member')
->setDescription('[ 商城 - 不需执行 ] 重新计算所有会员的等级');
}
/**
* 执行命令
* @param Input $input
* @param Output $output
* @throws \think\admin\Exception
*/
protected function execute(Input $input, Output $output)
{
// 获取总数
$total = $this->app->db->name('StoreMember')->count();
$count = 0;
// 分批处理数据
$this->app->db->name('StoreMember')->chunk(100, function (Collection $data) use (&$count, $total) {
foreach ($data->toArray() as $vo) {
$count++;
// 同步会员等级
$state = MemberService::instance()->syncLevel($vo['id']) ? '成功' : '失败';
// 更新任务进度(百分比)
$this->setQueueProgress(
"调整会员 {$vo['id']} {$vo['phone']} {$vo['nickname']} 级别{$state}",
intval($count / $total * 100)
);
}
});
// 设置任务完成
$this->setQueueSuccess("重新计算 {$count} 个会员的级别!");
}
}📖 Command 类详解
继承要求:
- 命令类必须继承
think\admin\Command类 - 需要实现
configure()方法配置命令 - 需要实现
execute()方法执行命令逻辑
常用方法:
// 1. 配置命令名称和描述
protected function configure()
{
$this->setName('xsync:member')
->setDescription('重新计算所有会员的等级');
}
// 2. 获取命令参数
$openid = $input->getArgument('openid');
$code = $input->getArgument('autocode');
// 3. 设置任务进度(百分比,0-100)
$this->setQueueProgress(string $message, int $progress = 0);
// 4. 设置任务成功
$this->setQueueSuccess(string $message = '');
// 5. 设置任务失败
$this->setQueueError(string $message = '');
// 6. 获取当前任务数据(在任务中)
$this->queue->code; // 任务编号
$this->queue->data; // 任务参数
$this->queue->record; // 任务记录完整示例:
<?php
declare(strict_types=1);
namespace app\wechat\command;
use think\admin\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;
class Auto extends Command
{
private $openid;
protected function configure()
{
$this->setName('xadmin:fansmsg');
$this->addArgument('openid', Argument::OPTIONAL, 'wechat user openid', '');
$this->addArgument('autocode', Argument::OPTIONAL, 'wechat auto message', '');
$this->setDescription('Wechat Users Push AutoMessage for ThinkAdmin');
}
protected function execute(Input $input, Output $output)
{
$code = $input->getArgument('autocode');
$this->openid = $input->getArgument('openid');
// 参数验证
if (empty($code)) {
$this->setQueueError('Message Code cannot be empty');
return;
}
if (empty($this->openid)) {
$this->setQueueError('Wechat Openid cannot be empty');
return;
}
// 查询数据
$map = ['code' => $code, 'status' => 1];
$data = WechatAuto::mk()->where($map)->find();
if (empty($data)) {
$this->setQueueError('Message Data Query failed');
return;
}
// 处理业务逻辑
$this->buildMessage($data->toArray());
}
private function buildMessage(array $data)
{
// 处理消息逻辑
$result = $this->sendMessage('text', ['content' => $data['content']]);
if (empty($result[0])) {
$this->setQueueError($result[1]);
} else {
$this->setQueueSuccess($result[1]);
}
}
}注意事项:
- 命令类需要在 Service 类的
register()方法中注册 - 使用
setQueueError()或setQueueSuccess()会结束任务执行 - 任务进度使用百分比(0-100),用于前端显示
- 可以通过
$this->queue访问当前任务数据
注册命令:
<?php
// app/admin/Service.php
namespace app\admin;
use think\admin\Plugin;
class Service extends Plugin
{
public function register(): void
{
// 注册命令
$this->commands([
\app\store\command\MemberSync::class,
]);
}
}验证命令是否注册:
# 执行以下命令,查看是否包含你的命令
php think
# 输出示例:
# member:sync 同步会员数据使用 Queue 类处理任务
除了使用 Command 指令,还可以直接继承 Queue 类:
<?php
declare(strict_types=1);
namespace app\store\queue;
use think\admin\Queue;
/**
* 会员等级同步任务
* @class MemberSyncQueue
* @package app\store\queue
*/
class MemberSyncQueue extends Queue
{
/**
* 执行任务
* @param array $data 任务数据
* @throws \think\admin\Exception
*/
public function execute(array $data)
{
$total = $this->app->db->name('StoreMember')->count();
$count = 0;
$this->app->db->name('StoreMember')->chunk(100, function ($members) use (&$count, $total) {
foreach ($members as $member) {
$count++;
MemberService::instance()->syncLevel($member['id']);
// 更新进度
$this->setQueueProgress(
"处理会员 {$member['id']}",
intval($count / $total * 100)
);
}
});
$this->setQueueSuccess("完成 {$count} 个会员的等级同步");
}
}创建任务时使用类名:
// 在控制器中
$this->_queue('同步会员等级', 'app\store\queue\MemberSyncQueue', 0, ['type' => 'level']);❓ 常见问题
❓ 任务不执行
可能原因:
- 监听进程未启动
- 命令未注册
- PHP 函数被禁用
解决方法:
# 1. 检查监听进程是否运行
php think xadmin:queue status
# 2. 启动监听进程
php think xadmin:queue start
# 3. 检查命令是否注册
php think
# 查看是否包含你的命令
# 4. 检查 PHP 函数是否启用
php -m | grep exec
# 或
php -r "echo function_exists('exec') ? 'OK' : 'FAIL';"❓ 任务失败
可能原因:
- 命令类不存在
- 命令类语法错误
- 数据库连接失败
解决方法:
// 1. 检查命令类是否存在
// 确保类文件路径正确
// 确保命名空间正确
// 2. 检查命令类语法
// 确保继承正确的类
// 确保实现了必要的方法
// 3. 查看任务日志
// 在任务中使用 try-catch 捕获异常
try {
// 任务逻辑
} catch (\Exception $e) {
$this->setQueueError('任务失败:' . $e->getMessage());
}❓ 查看日志
方法1:查看数据库
-- 查看任务列表
SELECT * FROM system_queue ORDER BY create_at DESC LIMIT 10;
-- 查看失败的任务
SELECT * FROM system_queue WHERE status = 4;方法2:查看系统日志
# 查看 ThinkAdmin 日志
tail -f runtime/log/think.log
# 查看 PHP 错误日志
tail -f /var/log/php_errors.log❓ 停止任务
方法1:使用命令停止
# 停止所有任务
php think xadmin:queue stop方法2:手动停止
# 查询正在执行的任务
php think xadmin:queue query
# 手动杀死进程(Linux)
kill -9 <进程ID>❓ 执行超时
优化建议:
- 分批处理:将大任务拆分成多个小任务
// ❌ 不推荐:一次性处理所有数据
foreach ($allData as $item) {
processItem($item);
}
// ✅ 推荐:分批处理
Db::name('Table')->chunk(100, function($data) {
foreach ($data as $item) {
processItem($item);
}
});- 设置超时时间:在任务中设置最大执行时间
$startTime = time();
$maxTime = 300; // 最大执行 5 分钟
foreach ($data as $item) {
if (time() - $startTime > $maxTime) {
$this->setQueueError('任务执行超时');
return;
}
processItem($item);
}- 拆分任务:将大任务拆分成多个小任务
// 创建多个小任务
foreach ($batches as $batch) {
$this->_queue("处理批次-{$batch['id']}", 'data:process', 0, [
'batch_id' => $batch['id']
]);
}❓ 任务重试
方法1:在任务中实现重试逻辑
class RetryTask extends Command
{
protected function execute(Input $input, Output $output)
{
$maxRetries = 3;
$retry = 0;
while ($retry < $maxRetries) {
try {
// 执行任务
$this->doWork();
$this->setQueueSuccess('完成');
return;
} catch (\Exception $e) {
$retry++;
if ($retry >= $maxRetries) {
$this->setQueueError('重试失败:' . $e->getMessage());
return;
}
sleep(5); // 等待 5 秒后重试
}
}
}
}方法2:创建新任务重试
class TaskWithRetry extends Command
{
protected function execute(Input $input, Output $output)
{
try {
// 执行任务
$this->doWork();
$this->setQueueSuccess('完成');
} catch (\Exception $e) {
// 失败后创建新任务重试
$retryCount = $this->queue->data['retry'] ?? 0;
if ($retryCount < 3) {
$queue = QueueService::instance([], true);
$queue->register(
'重试任务',
'task:retry',
60, // 60 秒后重试
['retry' => $retryCount + 1],
0,
0
);
} else {
$this->setQueueError('重试次数超限');
}
}
}
}