Laravel 队列系统深度解析 底层机制全揭秘

引言

队列是许多应用程序背后的秘密引擎。它们能够将缓慢、脆弱的任务转变为快速、可靠的工作流程。无论你是在发送数千封邮件、处理视频,还是编排复杂的管道流程。

本文将深入探讨 Laravel 队列系统的工作原理。从你分发一个任务的那一刻起,到它如何被序列化、存储、弹出、执行和重试。我们将追踪消息的完整生命周期,深入了解工作进程、可见性超时、退避策略、链式任务和批处理。

准备一杯咖啡,让我们真正理解 Laravel(以及一般情况下)队列的工作原理。

为什么队列很重要

队列让你的应用保持快速和可靠。通过将缓慢或有风险的工作从请求周期中移出,Laravel 让你能够快速返回响应、吸收流量峰值、并行运行任务,甚至在出现问题时以安全的方式重试它们。队列解耦了职责(Web -> 工作进程),隔离了故障,并让你能够控制时机和优先级。

常见优势

  • 卸载 I/O 密集型任务:邮件、Webhook、PDF、图像/视频处理、AI/API 调用
  • 提高可靠性:带有退避策略和超时的重试机制
  • 可预测的扩展:添加 Worker 而无需更改业务代码

核心概念

在深入了解 Laravel 队列工作原理之前,让我们先学习一些在本文中会用到的概念,这些概念可以帮助简化理解。

  • 任务(Job):一个小的类(实现 ShouldQueue 接口),其 handle() 方法执行具体工作
  • 分发(Dispatch):将任务发送到带有元数据的连接(队列、延迟、尝试次数)
  • 连接(Connection):你使用的后端存储(Redis、数据库、SQS),通过名称选择
  • 队列(名称):连接内的一个通道,用于优先级分类(高、默认、低)
  • 驱动器/连接器:在连接上推送/弹出任务的实现
  • 负载和序列化:JSON 数据包,其中 Eloquent 模型序列化为 ID,闭包通过 SerializableClosure 处理
  • 工作进程php artisan queue:work,一个长时间运行的进程,负责弹出、运行中间件并确认任务
  • 可见性超时:「锁定」窗口。必须超过任务运行时间以避免重复执行
  • 确认应答:成功时删除任务。异常时释放以稍后重试。达到限制后标记失败
  • 重试和退避:控制重试频率和等待时间
  • 失败处理:失败的任务进入 failed_jobs
  • 中间件:横切关注点,如限流和速率限制
  • 链式和批量:顺序执行的链式任务和并行执行的批量任务
  • 幂等性:安全地重复执行。使用唯一键、upsert 和保护机制

高层架构

如果我们以非常简化的方式来看,Laravel 的队列系统是一个简单的管道:创建任务、序列化它、存储它、弹出它、运行它,然后确认它。所有这些都有重试和失败的安全机制。

分发

你调用 Job::dispatch(...)。总线(Bus)序列化任务(类 + 数据 + 元数据)并将其交给队列管理器。

连接器

队列管理器选择一个连接器(Redis、数据库、SQS)并将 JSON 负载推送到指定的队列。

工作进程

php artisan queue:work 作为长期运行的进程运行。它保留一个任务,锁定它(可见性超时),并从容器解析依赖。

中间件

限流、重试、速率限制等包装任务的 handle() 方法。

确认或重试

成功时删除任务。异常触发释放(带退避的重试)或在 maxTries 后标记为失败。

示例

php
class ProcessReport implements ShouldQueue
{
    use Queueable, SerializesModels;

    public int $timeout = 30;

    public function __construct(public Report $report) {}

    public function backoff(): array
    {
        return [5, 30, 90];
    }

    public function handle(): void
    {
        // 处理报告的逻辑
    }
}

// 在数据库提交后分发,使用高优先级队列
ProcessReport::dispatch($report)
    ->afterCommit()
    ->onQueue('high');
bash
# 运行工作进程
php artisan queue:work --queue=high,default --timeout=35 --max-jobs=1000 --max-time=3600

分发生命周期

在深入了解 Laravel 如何处理分发逻辑之前,让我们先看看任务如何从你的代码到达队列连接的高层概览。

  1. 你分发任务Job::dispatch(...) 然后 Laravel 的总线(Bus)收集选项(队列、连接、延迟等)
  2. 任务被序列化(我们将在下一节检查这是如何工作的)
  3. 队列管理器选择连接(Redis、数据库、SQS)和队列名称,使用连接器推送负载,遵守延迟/可见性设置

现在我们对它的工作原理有了概念,让我们更深入地了解当你调用 Job::dispatch(...) 时会发生什么。

Dispatchable 特性 trait 的 dispatch() 方法返回一个围绕你的任务的 PendingDispatch 包装器,链式调用如 ->onConnection()->delay() 被代理到任务。

Queueable 特性 trait 是提供上述流畅方法的特性,也是设置 $connection$delay 和其他属性的地方。

PendingDispatch 分发时,无论是通过显式调用 dispatch() 方法还是在销毁时隐式调用,总线(Bus)检查:

  • ShouldQueue:决定是分发到队列还是内联运行。它还检查是否应该仅在响应发送后分发
  • 连接/队列$job->connection$job->queue(或配置默认值)
  • 时机$job->delay$job->timeoutbackoff()retryUntil()
  • 安全性ShouldBeUnique/uniqueId()ShouldBeEncryptedafterCommit

然后队列管理器选择连接器(Redis/数据库/SQS),应用延迟/可见性设置,并推送包含任务类、数据和所有收集的元数据的 JSON 负载。

负载(Payload)和序列化(Serialization)

当你在 Laravel 中分发任务时,框架将其包装在一个紧凑的 JSON 数据包中,工作进程(Worker)可以弹出、解码并安全且重复地运行。

数据包内容

下面我们将看到存储在 Laravel 分发任务时创建的 JSON 数据包中的一些最重要的值。

  • job:始终是 Illuminate\Queue\CallQueuedHandler@call(通用调用器)
  • uuiddisplayName:用于跟踪和日志
  • data.commandName:你的任务类名
  • data.command:你的任务对象的序列化克隆
  • 时机属性timeouttimeoutAtretryUntilbackoffmaxTriesmaxExceptions
  • tags:用于指标
  • encryptedtrue(仅当任务实现 ShouldBeEncrypted 时)

序列化的底层工作原理

下面我们将看到在分发任务时不同类型数据的序列化工作原理。

  • 任务对象:总线(Bus)克隆你的任务并使用 PHP 的 serialize() 序列化它。驱动器将该序列化的 blob 存储在 JSON 内并可能对其进行 base64 编码

  • Eloquent 模型:如果你的任务使用 SerializesModels 特性 trait,每个模型都会变成一个轻量级的 ModelIdentifier(类、ID、连接、关系)。在工作进程上,Laravel 通过重新查询数据库来重新水化模型——保持负载微小且一致

  • 闭包:基于闭包的任务使用 laravel/serializable-closure 包进行序列化。它们可以工作,但更推荐具体的任务类以实现更安全的滚动部署

  • 加密:实现 ShouldBeEncrypted 接口的任务在写入连接之前会对其序列化负载进行加密

JSON 数据包示例

json
{
  "uuid": "b4c1e7c6-...",
  "displayName": "App\\Jobs\\ProcessReport",
  "job": "Illuminate\\Queue\\CallQueuedHandler@call",
  "maxTries": 3,
  "timeout": 30,
  "backoff": [5, 30, 90],
  "tags": ["important", "client:42"],
  "data": {
    "commandName": "App\\Jobs\\ProcessReport",
    "command": "SERIALIZED JOB HERE"
  }
}

队列连接器(Queue Connector)深度解析

Redis 是最受欢迎的 Laravel 队列连接,它简单、快速,当你将它与 Laravel Horizon 配对时更是如此。我们将检查它在底层是如何工作的,但为此我们首先需要了解 Redis 的一些概念。

  • zset(有序集合):一个有序集合,其中每个成员都有一个数字分数。项目按分数顺序检索(例如,用于调度的时间戳)
  • RPUSH:将值附加到列表的尾部
  • ZADD:在有序集合中添加或更新带有分数的成员
  • BLPOP:从列表头部阻塞弹出。它等待直到有项目可用(或超时)

任务如何存储

在底层,它使用一个列表,加上两个有序集合。

  • 主队列(列表):键 queues:default - 在这里 RPUSH 负载
  • 延迟任务(有序集合):键 queues:default:delayed - 使用score = now + delay 进行 ZADD
  • 保留任务(有序集合):键 queues:default:reserved - 使用score = now + 可见性超时 进行 ZADD

弹出(Pop)工作原理

从队列中弹出任务是原子性完成的,它有三个阶段。

迁移到期项目

  • 它将 score <= now 的项目从延迟集合移动到主列表,使它们准备就绪
  • 它将过期的保留项目(可见性超时已过)从保留集合移动到主列表,以便崩溃/慢速工作进程得到重试
  • 这两个迁移操作都使用 Lua 脚本来保证原子性

保留(Reserve)项目

它在主列表上运行 BLPOP,并立即为弹出的负载在保留集合中运行 ZADD,score = now + retry_after(可见性超时)。

可见性超时可以在 config/queue.php 文件的 redis 部分的 retry_after 设置中配置。

BLPOP 时间可以在同一文件和部分的 block_for 设置中配置。

运行任务

BLPOP 之后,工作进程执行任务的 handle() 方法,包括所有配置的中间件、超时等。

确认应答(Acknowledge)和释放(Release)的工作原理

在确认应答或释放任务时,我们有三种不同的路径。

  • 成功:从保留有序集合中删除负载
  • 重试:增加尝试次数并将其移动到延迟有序集合,使用新分数
  • 失败:从保留有序集合中删除负载并记录在 failed_jobs

工作进程(Worker)内部机制

现在是时候了解当你运行 php artisan queue:work 时,工作进程底层的工作原理了。

它是一个长期运行的守护进程,启动你的应用一次,然后循环:弹出 → 保留 → 运行 → 确认/重试 → 重复。它快速、可观察,并设计为安全重启。让我们了解它在底层是如何工作的。

当你运行 php artisan queue:work 时,它启动容器和你的应用,然后进入循环。

  1. 工作进程向驱动器请求任务。驱动器原子性地保留带有可见性窗口的任务,即 retry_after 设置

  2. 任务从其 JSON 负载(payload)重构,解析依赖,如果有的话,执行包装 handle() 方法的中间件

  3. 运行任务时,强制执行每个任务的超时,如果任务超时,任务被杀死。任务将在可见性超时到达后再次可用

  4. 在成功运行中,任务从保留集合中删除,并触发 JobProcessed 事件

  5. 如果发生异常(Exception),它增加尝试次数,计算退避并释放到延迟集合

  6. 如果达到 maxTries/maxExceptions/retryUntil,任务被标记为失败,添加到 failed_jobs 并触发 JobFailed 事件

对于每个循环,工作进程还进行一些内务处理,检查诸如应用是否处于维护模式、是否未达到最大任务数设置以及处理优雅退出标志等事项,例如使用 queue:restart 时。

下面是使用伪代码的一个简单心理模型,以便更好地理解工作进程的机制。

php
while (! shouldQuit()) {
    if (! $job = $queue->pop($queues)) {
        sleepOrBlock(); // 例如,Redis BLPOP
        continue;
    }

    startTimeoutTimer($job->timeout); // Unix 上的 pcntl
    try {
        runMiddlewareThenHandle($job);
        ack($job);
        dispatch(JobProcessed::class);
    } catch (\Throwable $e) {
        handleFailureOrRelease($job, $e); // 退避 / 失败
    } finally {
        stopTimer();
    }

    enforceLimits(); // 最大任务数、最大时间、退出标志
}

Laravel 还提供了一些功能来更好地编排任务和队列,让我们看看其中一些是如何工作的。

防止重叠(Without Overlapping)

这是 Laravel 提供的一个中间件,防止两个工作进程同时处理同一个「事物」。

php
public function middleware(): array
{
    return [
        new WithoutOverlapping($this->userId),
    ];
}

在底层,当你添加这个中间件时,它获取一个原子缓存锁 cache()->lock($key, $ttl)。如果锁被持有,任务现在不被处理,它被释放回队列,带有小延迟,以便稍后另一次尝试可以尝试。成功/失败时,锁被释放。如果工作进程崩溃,锁在其 TTL 后自动过期,防止死锁。

唯一任务(Unique Jobs)

Laravel 还提供了一种方式,使重复任务甚至不进入队列。在分发(dispatch)时,Laravel 尝试获取由任务类 + uniqueId() 键控的缓存锁。如果不能,新的分发被跳过。锁在达到 $uniqueFor 秒后过期,如果设置或任务被释放时过期。

Laravel 为此提供了两个不同的接口。

  • ShouldBeUnique:锁持续到任务完成(或 TTL 结束),防止多个排队或运行的重复
  • ShouldBeUniqueUntilProcessing:一旦工作进程开始处理,锁就被释放,允许在当前任务运行时排队新副本(但不能事先并行排队)

过期键

如我们上面所见,WithoutOverlapping 中间件和 ShouldBeUnique 以及 ShouldBeUniqueUntilProcessing 接口依赖于设置了 TTL 的缓存锁。锁必须过期以避免在工作进程死亡的情况下永久阻塞。在 Redis 中,这在底层使用 SET NX EX

  • SET:将值写入键
  • NX:仅在键不存在时设置
  • EX:设置生存时间,以便键在 N 秒后自动过期

通过这种方式,Laravel 在完成时释放锁,如果进程崩溃,Redis 会自动过期它。

失败处理、重试和退避

Laravel 有一种一致且安全的方式来处理队列中任务的重试和失败,所以当出现问题时你不必担心。以下是它在底层的工作原理。

当工作进程从队列中弹出任务时,它增加负载上的尝试次数。如果发生超时或异常,而不是确认应答和删除任务,它将在可见性窗口后再次出现在队列中,当它被弹出时将再次增加尝试次数。

在任务再次被释放以重试之前,它检查 backoff() 方法的值或对 releaseAfter() 方法的显式调用,然后将任务发送到延迟有序集合(delayed sorted set),将 now() 与这些设置值之一相加。如果找不到这些,任务被释放以立即再次尝试。

要检查任务是否应该重试,工作进程检查 $tries$maxExceptions 属性和 retryUntil() 方法。如果尝试次数和异常数超过配置的次数或 now() > retryUntil(),任务然后被标记为失败。

当标记为失败时,工作进程抛出 MaxAttemptsExceededException 异常,从队列(queue)中删除任务,将其记录在 failed_jobs 表中,触发 JobFailed 事件并调用任务的 failed() 方法。

下面我们可以看到之前的同一个任务,但对重试和失败有更好的配置。

php
class ProcessReport implements ShouldQueue
{
    use Queueable, SerializesModels;

    public int $tries = 5;
    public int $timeout = 30;
    public int $maxExceptions = 3;

    public function __construct(public Report $report) {}

    public function backoff(): array
    {
        return [5, 30, 90];
    }

    public function retryUntil(): DateTime
    {
        return now()->addMinutes(30);
    }

    public function handle(): void
    {
        // 处理报告的逻辑
    }

    public function failed(?Throwable $exception): void
    {
        // 发送失败通知
    }
}

延迟、可见性和超时

在队列中处理任务时,时机非常重要,Laravel 使管理这一点变得容易。三个「时钟」是最重要的:任务何时变为可用(延迟)、它为工作进程保持「锁定」多长时间可见性以及你的代码被允许运行多长时间超时。让我们使用 Redis 连接检查这些是如何工作的。

延迟

Redis 为此使用延迟有序集合,分数为 now() + delay。延迟可以在分发任务时设置:

php
ProcessReport::dispatch($report)
    ->delay(now()->addMinutes(10));

可见性

当工作进程从队列中弹出消息时,负载被移动到保留有序集合,分数为 now() + retry_after。这可以在 config/queue.php 文件的 redis 部分下配置,如果任务没有及时确认应答,任务然后被迁移回主队列以可能重试。

Timeout

工作进程将强制执行任务的最大生存时间。这可以在任务本身的 $timeout 属性中设置以获得特定于任务的超时,或在运行 php artisan queue:work 命令时的 --timeout 选项中设置,这将应用于所有任务。超过此超时后,任务被杀死,在设置的可见性期间后,它被迁移回主队列以可能重试。

链式和批处理

Laravel 的总线(Bus)给我们的能力不仅仅是运行普通任务。它提供了一种编排顺序链式或并行批处理任务的方式,在管理队列中的任务时开启了许多新的可能性。

链式(Chains)

链式任务的概念非常简单,它按顺序运行任务,如果其中任何一个失败,链中的剩余任务被跳过。

链中任务的负载(payload)包含一个序列化的链式数组,其中包含所有任务。任务被确认应答后,工作进程然后分发下一个序列化的任务。

如果任务失败,链中的剩余任务被跳过,如果你定义了 catch() 回调,它将被运行。

php
Bus::chain([
    new ProcessPayment($order),
    new GenerateInvoice($order),
    new EmailInvoice($order),
])->onConnection('redis')
    ->onQueue('high')
    ->catch(fn (Throwable $e) => report($e))
    ->dispatch();

批处理(Batches)

虽然链式的概念是按顺序运行任务,但批处理是当你想要并行运行多个任务时。你可以跟踪批处理的进度,处理完成和失败钩子,允许取消它,甚至向其添加更多任务。

在底层,所有这些跟踪都发生在 job_batches 表中,它跟踪批处理有多少任务、有多少是待处理或失败的、哪些失败了以及批处理是否被取消。

批处理中的每个任务都将收到一个 batchId,当这些任务之一完成或失败时,job_batches 表会用信息更新。使用这种跟踪,批处理为我们提供了不同的钩子,我们可以根据批处理的状态运行额外的逻辑。

php
class ImportVideoChunk implements ShouldQueue
{
    use Batchable, Queueable, SerializesModels;

    public function __construct(
        public Video $video,
        public ?int start,
        public ?int end,
    ) {}

    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            return;
        }

        // 处理视频导入逻辑
    }
}

$batch = Bus::batch([
    new ImportVideoChunk(video: $video, end: 60),
    new ImportVideoChunk(video: $video, start: 61, end: 120),
    new ImportVideoChunk(video: $video, start: 121, end: 180),
    new ImportVideoChunk(video: $video, start: 181, end: 240),
    new ImportVideoChunk(video: $video, start: 241),
])->before(function (Batch $batch) {
    // 在批处理创建时运行,但没有任务被运行
})->progress(function (Batch $batch) {
    // 当批处理中的任务无错误完成时运行
})->then(function (Batch $batch) {
    // 当所有任务无错误完成时运行
})->catch(function (Batch $batch, Throwable $e) {
    // 当批处理中的任务失败时运行
})->finally(function (Batch $batch) {
    // 当批处理执行完所有任务时运行
})->dispatch();

如你在上面看到的,钩子(hooks)为你提供对 Batch $batch 对象的访问,你可以使用它来检查和管理当前批处理。同样的对象可以在任务内部通过 Batchable 特性(trait)提供的 $this->batch() 方法访问。

总结

总结一下,队列是 Laravel 将缓慢、脆弱的工作转变为快速、可靠体验的方式。你分发一个小任务,Laravel 序列化它,连接器(如 Redis)存储它,工作进程通过中间件弹出并运行它,系统通过清晰的事件和指标优雅地重试或失败。

所有这些都通过一个优雅而简单的 API 实现,每个开发者都可以使用它来改进应用程序。

通过这篇文章,你了解了支撑 Laravel 队列系统的底层机制、队列的一般工作原理,以及快速了解了 Redis 连接器如何与 Laravel 队列系统配合工作。

希望你喜欢这篇文章,如果喜欢,别忘了与你的朋友分享这篇文章!再见!


相关资源

技术标签#Laravel #队列 #Redis #PHP #异步处理 #性能优化

CatchAdmin
后端开发工程师,前端入门选手,略知相关服务器知识,偏爱❤️ Laravel & Vue
本作品采用《CC 协议》,转载必须注明作者和本文链接