消息队列处理
queue:work 对应 \Illuminate\Queue\Console\WorkCommand
- 依赖注入
\Illuminate\Queue\Worker和\Illuminate\Contracts\Cache\Repository - 如果不带强制参数
--force,系统处于下线状态,并且带once参数,那么调用work的sleep方法 listenForEvents注册一个job生命周期事件- 获取
connection和queue - 开始运行
work - 需要注意
\Illuminate\Queue\WorkerOptions在收集参数的时候,设置了默认值 - 根据是否带
once参数来决定运行哪个方法runNextJob和daemon extension_loaded('pcntl')如果有该模块,那么监听信号queue:restart会记录 key为illuminate:queue:restart缓存- 循环一开始判断是否处理任务、暂停、退出
- 获取一个job,同时注册timeout,防止job会被阻塞住
- 如果有job,那么运行job,没有job就sleep,WorkOptions中指定的时间。
- $jobsProcessed标示一个work目前执行了多少个job,超过配置中的maxjob就停止
- 重置Timout处理器,检查是否有必要停止work。
任务执行
\Illuminate\Queue\Worker::process 任务执行函数
- 抛出
JobProcessing事件 - 检查任务是否超过最大运行次数限制,超过了就标示错误,并从reserved队列中删除,发送after事件
- 获取要执行的job,这里得到的job是RedisJob,封装过一层的。pop是在RedisQueue类上封装的
- setJobInstanceIfNecessary先设置job实例到command(你自己的处理类),InteractsWithQueue使用这个trait的会有该步操作
dispatchThroughMiddleware执行之前先走中间件,然后调用handle方法
任务执行异常
\Illuminate\Queue\Worker::handleJobException
- 如果任务执行异常而且没有标示为失败,检查重拾次数,最大异常数,抛出
JobExceptionOccurred事件 - 如果可以继续,那么计算calculateBackoff,得到重试秒数