消息队列处理
queue:work
对应 \Illuminate\Queue\Console\WorkCommand
- 依赖注入
\Illuminate\Queue\Worker
和\Illuminate\Contracts\Cache\Repository
- 如果不带强制参数
--force
,系统处于下线状态,并且带once
参数,那么调用work的sleep方法 listenForEvents
注册一个job生命周期事件- 获取
connection
和queue
- 开始运行
work
- 需要注意
\Illuminate\Queue\WorkerOptions
在收集参数的时候,设置了默认值 - 根据是否带
once
参数来决定运行哪个方法runNextJob
和daemon
extension_loaded('pcntl')
如果有该模块,那么监听信号queue:restart
会记录 key为illuminate:queue:restart
缓存- 循环一开始判断是否处理任务、暂停、退出
- 获取一个job,同时注册timeout,防止job会被阻塞住
- 如果有job,那么运行job,没有job就sleep,WorkOptions中指定的时间。
- $jobsProcessed标示一个work目前执行了多少个job,超过配置中的maxjob就停止
- 重置Timout处理器,检查是否有必要停止work。
任务执行
\Illuminate\Queue\Worker::process
任务执行函数
- 抛出
JobProcessing
事件 - 检查任务是否超过最大运行次数限制,超过了就标示错误,并从reserved队列中删除,发送after事件
- 获取要执行的job,这里得到的job是RedisJob,封装过一层的。pop是在RedisQueue类上封装的
- setJobInstanceIfNecessary先设置job实例到command(你自己的处理类),InteractsWithQueue使用这个trait的会有该步操作
dispatchThroughMiddleware
执行之前先走中间件,然后调用handle方法
任务执行异常
\Illuminate\Queue\Worker::handleJobException
- 如果任务执行异常而且没有标示为失败,检查重拾次数,最大异常数,抛出
JobExceptionOccurred
事件 - 如果可以继续,那么计算calculateBackoff,得到重试秒数