队列是许多应用程序背后的秘密引擎。它们能够将缓慢、脆弱的任务转变为快速、可靠的工作流程。无论你是在发送数千封邮件、处理视频,还是编排复杂的管道流程。
本文将深入探讨 Laravel 队列系统的工作原理。从你分发一个任务的那一刻起,到它如何被序列化、存储、弹出、执行和重试。我们将追踪消息的完整生命周期,深入了解工作进程、可见性超时、退避策略、链式任务和批处理。
准备一杯咖啡,让我们真正理解 Laravel(以及一般情况下)队列的工作原理。
队列让你的应用保持快速和可靠。通过将缓慢或有风险的工作从请求周期中移出,Laravel 让你能够快速返回响应、吸收流量峰值、并行运行任务,甚至在出现问题时以安全的方式重试它们。队列解耦了职责(Web -> 工作进程),隔离了故障,并让你能够控制时机和优先级。
在深入了解 Laravel 队列工作原理之前,让我们先学习一些在本文中会用到的概念,这些概念可以帮助简化理解。
handle()
方法执行具体工作php artisan queue:work
,一个长时间运行的进程,负责弹出、运行中间件并确认任务failed_jobs
表如果我们以非常简化的方式来看,Laravel 的队列系统是一个简单的管道:创建任务、序列化它、存储它、弹出它、运行它,然后确认它。所有这些都有重试和失败的安全机制。
你调用 Job::dispatch(...)
。总线(Bus)序列化任务(类 + 数据 + 元数据)并将其交给队列管理器。
队列管理器选择一个连接器(Redis、数据库、SQS)并将 JSON 负载推送到指定的队列。
php artisan queue:work
作为长期运行的进程运行。它保留一个任务,锁定它(可见性超时),并从容器解析依赖。
限流、重试、速率限制等包装任务的 handle()
方法。
成功时删除任务。异常触发释放(带退避的重试)或在 maxTries
后标记为失败。
class ProcessReport implements ShouldQueue
{
use Queueable, SerializesModels;
public int $timeout = 30;
public function __construct(public Report $report) {}
public function backoff(): array
{
return [5, 30, 90];
}
public function handle(): void
{
// 处理报告的逻辑
}
}
// 在数据库提交后分发,使用高优先级队列
ProcessReport::dispatch($report)
->afterCommit()
->onQueue('high');
# 运行工作进程
php artisan queue:work --queue=high,default --timeout=35 --max-jobs=1000 --max-time=3600
在深入了解 Laravel 如何处理分发逻辑之前,让我们先看看任务如何从你的代码到达队列连接的高层概览。
Job::dispatch(...)
然后 Laravel 的总线(Bus)收集选项(队列、连接、延迟等)现在我们对它的工作原理有了概念,让我们更深入地了解当你调用 Job::dispatch(...)
时会发生什么。
Dispatchable
特性 trait 的 dispatch()
方法返回一个围绕你的任务的 PendingDispatch
包装器,链式调用如 ->onConnection()
、->delay()
被代理到任务。
Queueable
特性 trait 是提供上述流畅方法的特性,也是设置 $connection
、$delay
和其他属性的地方。
当 PendingDispatch
分发时,无论是通过显式调用 dispatch()
方法还是在销毁时隐式调用,总线(Bus)检查:
$job->connection
和 $job->queue
(或配置默认值)$job->delay
、$job->timeout
、backoff()
、retryUntil()
ShouldBeUnique
/uniqueId()
、ShouldBeEncrypted
、afterCommit
然后队列管理器选择连接器(Redis/数据库/SQS),应用延迟/可见性设置,并推送包含任务类、数据和所有收集的元数据的 JSON 负载。
当你在 Laravel 中分发任务时,框架将其包装在一个紧凑的 JSON 数据包中,工作进程(Worker)可以弹出、解码并安全且重复地运行。
下面我们将看到存储在 Laravel 分发任务时创建的 JSON 数据包中的一些最重要的值。
Illuminate\Queue\CallQueuedHandler@call
(通用调用器)timeout
、timeoutAt
、retryUntil
、backoff
、maxTries
、maxExceptions
true
(仅当任务实现 ShouldBeEncrypted
时)下面我们将看到在分发任务时不同类型数据的序列化工作原理。
任务对象:总线(Bus)克隆你的任务并使用 PHP 的 serialize()
序列化它。驱动器将该序列化的 blob 存储在 JSON 内并可能对其进行 base64 编码
Eloquent 模型:如果你的任务使用 SerializesModels
特性 trait
,每个模型都会变成一个轻量级的 ModelIdentifier
(类、ID、连接、关系)。在工作进程上,Laravel 通过重新查询数据库来重新水化模型——保持负载微小且一致
闭包:基于闭包的任务使用 laravel/serializable-closure
包进行序列化。它们可以工作,但更推荐具体的任务类以实现更安全的滚动部署
加密:实现 ShouldBeEncrypted
接口的任务在写入连接之前会对其序列化负载进行加密
{
"uuid": "b4c1e7c6-...",
"displayName": "App\\Jobs\\ProcessReport",
"job": "Illuminate\\Queue\\CallQueuedHandler@call",
"maxTries": 3,
"timeout": 30,
"backoff": [5, 30, 90],
"tags": ["important", "client:42"],
"data": {
"commandName": "App\\Jobs\\ProcessReport",
"command": "SERIALIZED JOB HERE"
}
}
Redis 是最受欢迎的 Laravel 队列连接,它简单、快速,当你将它与 Laravel Horizon 配对时更是如此。我们将检查它在底层是如何工作的,但为此我们首先需要了解 Redis 的一些概念。
在底层,它使用一个列表,加上两个有序集合。
queues:default
- 在这里 RPUSH 负载queues:default:delayed
- 使用score
= now + delay 进行 ZADDqueues:default:reserved
- 使用score
= now + 可见性超时 进行 ZADD从队列中弹出任务是原子性完成的,它有三个阶段。
它在主列表上运行 BLPOP
,并立即为弹出的负载在保留集合中运行 ZADD
,score = now + retry_after(可见性超时)。
可见性超时可以在 config/queue.php
文件的 redis 部分的 retry_after
设置中配置。
BLPOP
时间可以在同一文件和部分的 block_for
设置中配置。
在 BLPOP
之后,工作进程执行任务的 handle()
方法,包括所有配置的中间件、超时等。
在确认应答或释放任务时,我们有三种不同的路径。
failed_jobs
中现在是时候了解当你运行 php artisan queue:work
时,工作进程底层的工作原理了。
它是一个长期运行的守护进程,启动你的应用一次,然后循环:弹出 → 保留 → 运行 → 确认/重试 → 重复。它快速、可观察,并设计为安全重启。让我们了解它在底层是如何工作的。
当你运行 php artisan queue:work
时,它启动容器和你的应用,然后进入循环。
工作进程向驱动器请求任务。驱动器原子性地保留带有可见性窗口的任务,即 retry_after
设置
任务从其 JSON 负载(payload)重构,解析依赖,如果有的话,执行包装 handle()
方法的中间件
运行任务时,强制执行每个任务的超时,如果任务超时,任务被杀死。任务将在可见性超时到达后再次可用
在成功运行中,任务从保留集合中删除,并触发 JobProcessed
事件
如果发生异常(Exception),它增加尝试次数,计算退避并释放到延迟集合
如果达到 maxTries
/maxExceptions
/retryUntil
,任务被标记为失败,添加到 failed_jobs
并触发 JobFailed
事件
对于每个循环,工作进程还进行一些内务处理,检查诸如应用是否处于维护模式、是否未达到最大任务数设置以及处理优雅退出标志等事项,例如使用 queue:restart
时。
下面是使用伪代码的一个简单心理模型,以便更好地理解工作进程的机制。
while (! shouldQuit()) {
if (! $job = $queue->pop($queues)) {
sleepOrBlock(); // 例如,Redis BLPOP
continue;
}
startTimeoutTimer($job->timeout); // Unix 上的 pcntl
try {
runMiddlewareThenHandle($job);
ack($job);
dispatch(JobProcessed::class);
} catch (\Throwable $e) {
handleFailureOrRelease($job, $e); // 退避 / 失败
} finally {
stopTimer();
}
enforceLimits(); // 最大任务数、最大时间、退出标志
}
Laravel 还提供了一些功能来更好地编排任务和队列,让我们看看其中一些是如何工作的。
这是 Laravel 提供的一个中间件,防止两个工作进程同时处理同一个「事物」。
public function middleware(): array
{
return [
new WithoutOverlapping($this->userId),
];
}
在底层,当你添加这个中间件时,它获取一个原子缓存锁 cache()->lock($key, $ttl)
。如果锁被持有,任务现在不被处理,它被释放回队列,带有小延迟,以便稍后另一次尝试可以尝试。成功/失败时,锁被释放。如果工作进程崩溃,锁在其 TTL 后自动过期,防止死锁。
Laravel 还提供了一种方式,使重复任务甚至不进入队列。在分发(dispatch)时,Laravel 尝试获取由任务类 + uniqueId()
键控的缓存锁。如果不能,新的分发被跳过。锁在达到 $uniqueFor
秒后过期,如果设置或任务被释放时过期。
Laravel 为此提供了两个不同的接口。
如我们上面所见,WithoutOverlapping
中间件和 ShouldBeUnique
以及 ShouldBeUniqueUntilProcessing
接口依赖于设置了 TTL 的缓存锁。锁必须过期以避免在工作进程死亡的情况下永久阻塞。在 Redis 中,这在底层使用 SET NX EX
。
通过这种方式,Laravel 在完成时释放锁,如果进程崩溃,Redis 会自动过期它。
Laravel 有一种一致且安全的方式来处理队列中任务的重试和失败,所以当出现问题时你不必担心。以下是它在底层的工作原理。
当工作进程从队列中弹出任务时,它增加负载上的尝试次数。如果发生超时或异常,而不是确认应答和删除任务,它将在可见性窗口后再次出现在队列中,当它被弹出时将再次增加尝试次数。
在任务再次被释放以重试之前,它检查 backoff()
方法的值或对 releaseAfter()
方法的显式调用,然后将任务发送到延迟有序集合(delayed sorted set),将 now()
与这些设置值之一相加。如果找不到这些,任务被释放以立即再次尝试。
要检查任务是否应该重试,工作进程检查 $tries
、$maxExceptions
属性和 retryUntil()
方法。如果尝试次数和异常数超过配置的次数或 now() > retryUntil()
,任务然后被标记为失败。
当标记为失败时,工作进程抛出 MaxAttemptsExceededException
异常,从队列(queue)中删除任务,将其记录在 failed_jobs
表中,触发 JobFailed
事件并调用任务的 failed()
方法。
下面我们可以看到之前的同一个任务,但对重试和失败有更好的配置。
class ProcessReport implements ShouldQueue
{
use Queueable, SerializesModels;
public int $tries = 5;
public int $timeout = 30;
public int $maxExceptions = 3;
public function __construct(public Report $report) {}
public function backoff(): array
{
return [5, 30, 90];
}
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
public function handle(): void
{
// 处理报告的逻辑
}
public function failed(?Throwable $exception): void
{
// 发送失败通知
}
}
在队列中处理任务时,时机非常重要,Laravel 使管理这一点变得容易。三个「时钟」是最重要的:任务何时变为可用(延迟)、它为工作进程保持「锁定」多长时间可见性以及你的代码被允许运行多长时间超时。让我们使用 Redis 连接检查这些是如何工作的。
Redis 为此使用延迟有序集合,分数为 now() + delay
。延迟可以在分发任务时设置:
ProcessReport::dispatch($report)
->delay(now()->addMinutes(10));
当工作进程从队列中弹出消息时,负载被移动到保留有序集合,分数为 now() + retry_after
。这可以在 config/queue.php
文件的 redis 部分下配置,如果任务没有及时确认应答,任务然后被迁移回主队列以可能重试。
工作进程将强制执行任务的最大生存时间。这可以在任务本身的 $timeout
属性中设置以获得特定于任务的超时,或在运行 php artisan queue:work
命令时的 --timeout
选项中设置,这将应用于所有任务。超过此超时后,任务被杀死,在设置的可见性期间后,它被迁移回主队列以可能重试。
Laravel 的总线(Bus)给我们的能力不仅仅是运行普通任务。它提供了一种编排顺序链式或并行批处理任务的方式,在管理队列中的任务时开启了许多新的可能性。
链式任务的概念非常简单,它按顺序运行任务,如果其中任何一个失败,链中的剩余任务被跳过。
链中任务的负载(payload)包含一个序列化的链式数组,其中包含所有任务。任务被确认应答后,工作进程然后分发下一个序列化的任务。
如果任务失败,链中的剩余任务被跳过,如果你定义了 catch()
回调,它将被运行。
Bus::chain([
new ProcessPayment($order),
new GenerateInvoice($order),
new EmailInvoice($order),
])->onConnection('redis')
->onQueue('high')
->catch(fn (Throwable $e) => report($e))
->dispatch();
虽然链式的概念是按顺序运行任务,但批处理是当你想要并行运行多个任务时。你可以跟踪批处理的进度,处理完成和失败钩子,允许取消它,甚至向其添加更多任务。
在底层,所有这些跟踪都发生在 job_batches
表中,它跟踪批处理有多少任务、有多少是待处理或失败的、哪些失败了以及批处理是否被取消。
批处理中的每个任务都将收到一个 batchId
,当这些任务之一完成或失败时,job_batches
表会用信息更新。使用这种跟踪,批处理为我们提供了不同的钩子,我们可以根据批处理的状态运行额外的逻辑。
class ImportVideoChunk implements ShouldQueue
{
use Batchable, Queueable, SerializesModels;
public function __construct(
public Video $video,
public ?int start,
public ?int end,
) {}
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
// 处理视频导入逻辑
}
}
$batch = Bus::batch([
new ImportVideoChunk(video: $video, end: 60),
new ImportVideoChunk(video: $video, start: 61, end: 120),
new ImportVideoChunk(video: $video, start: 121, end: 180),
new ImportVideoChunk(video: $video, start: 181, end: 240),
new ImportVideoChunk(video: $video, start: 241),
])->before(function (Batch $batch) {
// 在批处理创建时运行,但没有任务被运行
})->progress(function (Batch $batch) {
// 当批处理中的任务无错误完成时运行
})->then(function (Batch $batch) {
// 当所有任务无错误完成时运行
})->catch(function (Batch $batch, Throwable $e) {
// 当批处理中的任务失败时运行
})->finally(function (Batch $batch) {
// 当批处理执行完所有任务时运行
})->dispatch();
如你在上面看到的,钩子(hooks)为你提供对 Batch $batch
对象的访问,你可以使用它来检查和管理当前批处理。同样的对象可以在任务内部通过 Batchable
特性(trait)提供的 $this->batch()
方法访问。
总结一下,队列是 Laravel 将缓慢、脆弱的工作转变为快速、可靠体验的方式。你分发一个小任务,Laravel 序列化它,连接器(如 Redis)存储它,工作进程通过中间件弹出并运行它,系统通过清晰的事件和指标优雅地重试或失败。
所有这些都通过一个优雅而简单的 API 实现,每个开发者都可以使用它来改进应用程序。
通过这篇文章,你了解了支撑 Laravel 队列系统的底层机制、队列的一般工作原理,以及快速了解了 Redis 连接器如何与 Laravel 队列系统配合工作。
希望你喜欢这篇文章,如果喜欢,别忘了与你的朋友分享这篇文章!再见!
相关资源:
技术标签:#Laravel
#队列
#Redis
#PHP
#异步处理
#性能优化