CatchAdmin PHP 后台管理框架 Logo CatchAdmin

Laravel SSE 上线实战,重连、超时与多租户事件流隔离

Laravel 的 SSE(Server-Sent Events,基于长连接 HTTP 的服务器推送),在开发环境里上手快得"可疑",一旦真实用户打上来又"脆弱得可疑"。跑通一个基础流大概二十分钟,而要让 Laravel SSE 在生产里稳住并发、跨多租户、在连接断开后客户端干净地重连上——要花的时间可就不止一点半点。

本文要覆盖的是那些只有在 SSE 上线后才会冒出来的失败模式:客户端流到一半重连会发生什么,PHP-FPM 与 Nginx 如何联手把长连接悄悄杀掉,以及如何按租户隔离事件流而不引入数据泄漏的风险。此外还会深入谈测试,因为 SSE 正是那种"看起来一切正常、直到某一串特定的事件序列把窟窿捅出来"的功能。

选择 SSE 意味着承担什么

讲失败模式之前,先坦白 SSE 相对 WebSocket 的权衡。SSE 是单向的——服务端推、客户端收。浏览器原生通过 EventSource API 自动处理重连,协议本身就是 HTTP,也就是说能和既有 Nginx 配置、鉴权中间件、负载均衡器顺利相处,直到它不肯了为止

会咬人的那一条约束是连接持久性。每个连上的客户端在会话生命周期内都占着一个长连接。在 PHP-FPM 下,这意味着一个 worker 被这条连接钉住。默认 pm.max_children 在 5 到 50 之间(视机器规格而定),30 个并发 SSE 客户端就足以把 PHP-FPM 进程池打满。这不是理论威胁——它是生产里最先爆的那一块。

替代方案是把 SSE 响应交给非阻塞驱动去跑。两种路径后面都会讲到。

路由与流式控制器

Laravel 11/12 通过 StreamedResponse 处理 SSE。路由本身很朴素:

php
// routes/api.php
Route::get('/stream/events', StreamEventsController::class)
    ->middleware(['auth:sanctum', 'throttle:sse']);

注意那一条专用的 throttle:sse 速率限制器——这在生产里不是可选项。在 bootstrap/app.php 里定义它:

php
// bootstrap/app.php
->withMiddleware(function (Middleware $middleware) {
    $middleware->throttleWithRedis('sse', function (Request $request) {
        return Limit::perMinute(10)->by($request->user()?->id ?: $request->ip());
    });
})

控制器:

php
<?php

namespace App\Http\Controllers;

use App\Services\TenantEventStream;
use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;

class StreamEventsController
{
    public function __invoke(Request $request, TenantEventStream $stream): StreamedResponse
    {
        $tenant = $request->user()->tenant;
        $lastId = $request->header('Last-Event-ID');

        return response()->stream(function () use ($tenant, $lastId, $stream) {
            $stream->pipe($tenant, $lastId, function (string $payload) {
                echo "data: {$payload}\n\n";
                ob_flush();
                flush();
            });
        }, 200, [
            'Content-Type'      => 'text/event-stream',
            'Cache-Control'     => 'no-cache',
            'X-Accel-Buffering' => 'no',
            'Connection'        => 'keep-alive',
        ]);
    }
}

这里有两处特别重要。X-Accel-Buffering: no 关闭了 Nginx 对这条响应的代理缓冲——没有它,Nginx 会默默把事件攒在一起再批量下发,实时契约直接被破坏。Last-Event-ID 头的读取则是重连逻辑的基石,下一节就讲。

客户端重连逻辑

浏览器的 EventSource 会在连接断开后自动重连。默认等 3 秒、重新请求同一个 URL。它不会自动告诉服务端"我上次处理到哪儿了"——除非事件流里正确使用了 idretry 字段。

每一条服务端发出的事件都应当带一个 id

php
private function formatEvent(string $data, string $id, string $event = 'message'): string
{
    return "id: {$id}\nevent: {$event}\ndata: {$data}\nretry: 5000\n\n";
}

retry 告诉浏览器下次重连前等多久,5000ms 是合理的生产值。id 是浏览器在重连时通过 Last-Event-ID 头回传的内容。

服务端那边,当客户端带着 Last-Event-ID 重连时,需要把期间漏掉的事件重放。这就要看事件存储怎么设计:

php
<?php

namespace App\Services;

use App\Models\Tenant;
use Illuminate\Support\Facades\Redis;

class TenantEventStream
{
    private const MAX_REPLAY    = 100;
    private const POLL_INTERVAL = 1;   // 秒
    private const MAX_DURATION  = 60;  // 达到后主动断开并让客户端重连(秒)

    public function pipe(Tenant $tenant, ?string $lastId, callable $emit): void
    {
        $channel = $this->channelKey($tenant);
        $elapsed = 0;

        // 先重放错过的事件
        if ($lastId !== null) {
            $this->replay($channel, $lastId, $emit);
        }

        // 发一条注释型心跳保持连接
        echo ": heartbeat\n\n";
        ob_flush();
        flush();

        while ($elapsed < self::MAX_DURATION) {
            if (connection_aborted()) {
                break;
            }

            $events = Redis::lrange("{$channel}:pending", 0, -1);

            foreach ($events as $raw) {
                $event = json_decode($raw, true);
                $emit($this->formatEvent($event['data'], $event['id'], $event['type']));
                Redis::lrem("{$channel}:pending", 0, $raw);
            }

            sleep(self::POLL_INTERVAL);
            $elapsed += self::POLL_INTERVAL;
        }

        // 优雅关闭——让客户端重新连回来
        echo "event: reconnect\ndata: {}\n\n";
        ob_flush();
        flush();
    }

    private function replay(string $channel, string $lastId, callable $emit): void
    {
        $history = Redis::lrange("{$channel}:history", 0, self::MAX_REPLAY - 1);

        $found = false;
        foreach ($history as $raw) {
            $event = json_decode($raw, true);
            if ($found) {
                $emit($this->formatEvent($event['data'], $event['id'], $event['type']));
            }
            if ($event['id'] === $lastId) {
                $found = true;
            }
        }
    }

    private function channelKey(Tenant $tenant): string
    {
        return "sse:tenant:{$tenant->id}";
    }

    private function formatEvent(string $data, string $id, string $event = 'message'): string
    {
        return "id: {$id}\nevent: {$event}\ndata: {$data}\nretry: 5000\n\n";
    }
}

MAX_DURATION 的上限是故意设的。代码会在 60 秒后主动关闭连接并发出一条 reconnect 事件,客户端立刻重连。这样做既避免 PHP-FPM worker 被永久霸占,也为每次重连提供了一次轮换鉴权上下文的天然时机。这会给重连周期加一点点延迟——考虑到 5 秒的 retry 值,用户几乎察觉不到。

生产侧陷阱。 不要在流式控制器里用 set_time_limit(0)。听起来很合理——让连接永远开着——但代价是:一个行为异常的客户端或一次网络分区就能把 PHP-FPM worker 永久扣住。在并发上升时,这会拖垮整个应用,而不仅仅是 SSE 端点。设上限,优雅重连

高并发下的连接断开

PHP-FPM 的瓶颈是真实的。标准解法是把 SSE 放在进程级而不是 worker 级处理。两种模式值得并列对比:

方案并发上限内存画像复杂度
PHP-FPM 轮询(如上)pm.max_children每 worker 较低
Laravel Octane(Swoole)数千基线更高
独立的 Node/Go 微服务非常高看实现
Reverb + 自定义 SSE 适配器

对多数 Laravel 应用,PHP-FPM 轮询方案只要调得够狠,能顶住几百个并发 SSE 客户端。再往上,Octane + Swoole 是阻力最小的路径。

如果已经在生产 Nginx 栈上跑 Laravel,还需要为 SSE 加一组专用指令,阻止 Nginx 把这条流当成普通的带缓冲 HTTP 响应:

nginx
location /stream/events {
    proxy_pass                http://php-fpm;
    proxy_http_version        1.1;
    proxy_set_header          Connection '';
    proxy_buffering           off;
    proxy_cache               off;
    proxy_read_timeout        120s;
    chunked_transfer_encoding on;
}

proxy_read_timeout 这一条是关键。Nginx 默认 60 秒——会在应用还没来得及发心跳之前就把长连接剪断。把它设得比 MAX_DURATION 上限更高。

至于心跳本身,那条注释式 keep-alive(echo ": heartbeat\n\n")并非装饰。AWS ALB 以及许多代理会把空闲超过 60 秒的连接强制关闭。每 20–30 秒发一次心跳正好能把它们按住:

php
private function emitHeartbeat(): void
{
    echo ": heartbeat " . now()->timestamp . "\n\n";
    ob_flush();
    flush();
}

多租户事件流隔离

多数 SSE 实现都在这一段埋着一个沉默的安全风险。如果频道 key 或订阅逻辑写得太随意,以另一租户凭据重连鉴权中的竞态,都可能让一个租户的事件被另一租户看到。

隔离模型必须在三层都强制:

1. 频道 key 必须是确定性而不可猜的

绝不要用 tenant_1 这样的序号、或租户域名 slug 作为 Redis key。基于租户 ID 与一个秘密派生:

php
private function channelKey(Tenant $tenant): string
{
    return 'sse:' . hash_hmac('sha256', (string) $tenant->id, config('app.key'));
}

这样即便 Redis key 命名规律被看到,攻击者也无法在没有 app.key 的情况下构造出别的租户的合法 key。

2. 每次重连都必须重新鉴权,而不是靠缓存

Laravel Sanctum 的 token 鉴权走标准中间件,也就是说每次请求都会重新校验——这对每一次 SSE 重连同样成立。不要用长生命周期 cookie 或共享 session 绕过它。每次重连都是一次全新的 HTTP 请求,就按全新请求来对待

php
// 中间件栈本身会处理鉴权,但 ability 的作用域要显式写出
Route::get('/stream/events', StreamEventsController::class)
    ->middleware(['auth:sanctum', 'ability:stream:read', 'throttle:sse']);

3. 事件发布方必须在写入时做租户作用域,而不是读取时

当一个 job 或 service 向 Redis 发布事件时,必须使用同一套 HMAC 派生的 key。任何未做作用域的发布,都是一次"待暴露"的数据泄漏:

php
<?php

namespace App\Services;

use App\Models\Tenant;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Str;

class EventPublisher
{
    private const HISTORY_TTL = 300; // 5 分钟
    private const MAX_HISTORY = 500;

    public function publish(Tenant $tenant, string $type, array $data): void
    {
        $id      = (string) Str::uuid();
        $channel = $this->channelKey($tenant);

        $payload = json_encode([
            'id'   => $id,
            'type' => $type,
            'data' => json_encode($data),
        ]);

        Redis::pipeline(function ($pipe) use ($channel, $payload) {
            // 活跃连接消费的待处理队列
            $pipe->rpush("{$channel}:pending", $payload);

            // 用于重连重放的历史
            $pipe->rpush("{$channel}:history", $payload);
            $pipe->ltrim("{$channel}:history", -self::MAX_HISTORY, -1);
            $pipe->expire("{$channel}:history", self::HISTORY_TTL);
        });
    }

    private function channelKey(Tenant $tenant): string
    {
        return 'sse:' . hash_hmac('sha256', (string) $tenant->id, config('app.key'));
    }
}

这里用 Redis pipeline 不只是为了性能,而是为了让"pending 队列"与"history 列表"的写入具备原子性。一个只写了 pending 却没写 history 的中途失败,会让重放逻辑错乱。

架构侧备注。 如果是多租户架构,且同一租户下可以有多个并发用户——而不是一条"全租户共用"的事件流——就还需要再加一层作用域。把 tenant_id + user_id 一起参与 key 派生,或引入订阅模型让每个用户订阅命名频道。一条租户级别的流对于管理员看板或后台作业状态推送还行,但一旦不同用户需要看见不同事件,它就塌了。

从队列作业里发布事件

多数真实应用里,事件并不源自控制器,而是源自队列作业、Webhook 或定时任务。基于既有生产级队列架构,一个队列作业可以这样干净地向租户事件流发布:

php
<?php

namespace App\Jobs;

use App\Models\Tenant;
use App\Services\EventPublisher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class BroadcastTenantEvent implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries   = 3;
    public int $backoff = 5;

    public function __construct(
        private readonly Tenant $tenant,
        private readonly string $type,
        private readonly array  $data,
    ) {}

    public function handle(EventPublisher $publisher): void
    {
        $publisher->publish($this->tenant, $this->type, $this->data);
    }

    public function failed(\Throwable $e): void
    {
        \Log::error('SSE event publish failed', [
            'tenant' => $this->tenant->id,
            'type'   => $this->type,
            'error'  => $e->getMessage(),
        ]);
    }
}

$tries$backoff 的配置是因为:Redis 在内存压力下的发布失败是真实的生产场景。作业会用 5 秒退避重试,而不是把事件默默吞掉。

测试深度剖析

SSE 的测试写起来出了名地别扭。流式响应不像普通 JSON 响应那样行为可预测,而重连与重放逻辑又引入了时间依赖,这些依赖与 PHPUnit 的同步执行模型并不天然契合。下面给出能真正覆盖它的写法。

单元测试:事件流服务

TenantEventStream 放在隔离环境里测,mock 掉 Redis,再按正确顺序断言事件负载:

php
<?php

namespace Tests\Unit;

use App\Models\Tenant;
use App\Services\TenantEventStream;
use Illuminate\Support\Facades\Redis;
use Tests\TestCase;

class TenantEventStreamTest extends TestCase
{
    public function test_replays_events_after_last_id(): void
    {
        $tenant  = Tenant::factory()->make(['id' => 1]);
        $channel = 'sse:' . hash_hmac('sha256', '1', config('app.key'));

        $events = [
            json_encode(['id' => 'evt-1', 'type' => 'update', 'data' => '{"foo":"bar"}']),
            json_encode(['id' => 'evt-2', 'type' => 'update', 'data' => '{"foo":"baz"}']),
            json_encode(['id' => 'evt-3', 'type' => 'update', 'data' => '{"foo":"qux"}']),
        ];

        Redis::shouldReceive('lrange')
            ->with("{$channel}:history", 0, 99)
            ->once()
            ->andReturn($events);

        Redis::shouldReceive('lrange')
            ->with("{$channel}:pending", 0, -1)
            ->andReturn([]);

        $emitted = [];
        $stream  = $this->app->make(TenantEventStream::class);

        // 覆盖 MAX_DURATION 为 0 让循环立刻退出
        // 可用反射或测试子类实现
        $stream->pipe($tenant, 'evt-1', function (string $payload) use (&$emitted) {
            $emitted[] = $payload;
        });

        $this->assertCount(2, $emitted); // 只有 evt-2 与 evt-3
        $this->assertStringContainsString('evt-2', $emitted[0]);
        $this->assertStringContainsString('evt-3', $emitted[1]);
    }
}

要覆盖循环时长,可以把 MAX_DURATION 设计成能从构造函数注入,或者暴露成可被测试子类覆盖的公有常量。硬编码的 private const 是测试气味

集成测试:HTTP 响应

Laravel 的 TestResponse 原生并不支持流式断言,但响应头与初始输出结构仍然可测:

php
public function test_sse_response_has_correct_headers(): void
{
    $user = User::factory()->withTenant()->create();

    $response = $this->actingAs($user, 'sanctum')
        ->get('/stream/events');

    $response->assertStatus(200);
    $response->assertHeader('Content-Type', 'text/event-stream');
    $response->assertHeader('X-Accel-Buffering', 'no');
    $response->assertHeader('Cache-Control', 'no-cache');
}

public function test_unauthenticated_request_is_rejected(): void
{
    $this->get('/stream/events')->assertUnauthorized();
}

public function test_tenant_isolation_different_users_cannot_share_stream(): void
{
    $tenantA = Tenant::factory()->create();
    $tenantB = Tenant::factory()->create();

    $userA = User::factory()->for($tenantA)->create();
    $userB = User::factory()->for($tenantB)->create();

    $keyA = 'sse:' . hash_hmac('sha256', (string) $tenantA->id, config('app.key'));
    $keyB = 'sse:' . hash_hmac('sha256', (string) $tenantB->id, config('app.key'));

    // 两个频道 key 必须不同
    $this->assertNotEquals($keyA, $keyB);

    // 向 A 的频道发布不应落到 B 的 key 上
    Redis::shouldReceive('pipeline')->once();

    (new EventPublisher())->publish($tenantA, 'update', ['value' => 42]);

    Redis::shouldNotHaveReceived('rpush', ["{$keyB}:pending", \Mockery::any()]);
}

用 Pest 测试重连重放

如果测试套件用 Pest,dataset 特性能把各种重连场景写得很清晰:

php
dataset('reconnect_scenarios', [
    'reconnect from first event'  => ['evt-1', 2], // 应重放 2 条
    'reconnect from second event' => ['evt-2', 1], // 应重放 1 条
    'reconnect from last event'   => ['evt-3', 0], // 无需重放
    'unknown last id'             => ['evt-99', 0], // 未知 id,不重放
]);

it('replays correct events on reconnect', function (string $lastId, int $expected) {
    // ... setup 与断言
})->with('reconnect_scenarios');

它覆盖的是最容易被忽略的边界:Last-Event-ID 过期(历史缓冲里已不存在),以及 Last-Event-ID 恰好指向历史里最后一条事件(没有东西可重放,连接应正常继续)。

边界情况提醒。 当 Redis 历史缓冲被裁剪后(因为达到 MAX_HISTORY),重连客户端可能发来一个历史里已经找不到Last-Event-IDreplay 函数必须优雅处理这种情况:要么重放整段可用历史,要么发出 full-refresh 事件告诉客户端放弃增量、回到 API 去拉全量。

php
private function replay(string $channel, string $lastId, callable $emit): void
{
    $history = Redis::lrange("{$channel}:history", 0, self::MAX_REPLAY - 1);

    $found  = false;
    $buffer = [];

    foreach ($history as $raw) {
        $event = json_decode($raw, true);
        if ($found) {
            $buffer[] = $raw;
        }
        if ($event['id'] === $lastId) {
            $found = true;
        }
    }

    if (! $found) {
        // 这个 id 已经被驱逐——通知客户端做全量刷新
        echo "event: full-refresh\ndata: {}\n\n";
        ob_flush();
        flush();
        return;
    }

    foreach ($buffer as $raw) {
        $event = json_decode($raw, true);
        $emit($this->formatEvent($event['data'], $event['id'], $event['type']));
    }
}

监控与可观测性

流式连接对 Laravel Telescope 的标准请求日志是不可见的——在 Telescope 看来,这次请求还没结束。需要对流做显式插桩。

把活跃 SSE 连接用带 TTL 的 Redis key 记录下来,并在管理面板里展示其计数:

php
// 连接建立时
Redis::setex("sse:active:{$tenant->id}:{$connectionId}", 90, 1);

// 心跳时续 TTL
Redis::expire("sse:active:{$tenant->id}:{$connectionId}", 90);

// 连接关闭时(connection_aborted 或命中 MAX_DURATION)
Redis::del("sse:active:{$tenant->id}:{$connectionId}");

// 查询活跃连接数
$count = count(Redis::keys("sse:active:{$tenant->id}:*"));

这 90 秒的 TTL 充当"死人开关"。连接因为网络分区或 PHP-FPM worker 崩溃而没有走到正常关闭路径时,key 会自动过期,计数不会失真。

生产上线 Checklist

SSE 正式上线前,逐条过一遍:

关注点检查项
PHP-FPM pm.max_children按预期并发 SSE 连接数设定
Nginx proxy_read_timeout高于 MAX_DURATION
X-Accel-Buffering: no出现在响应头中
心跳间隔≤ 30 秒,足以扛过 ALB / 代理的空闲超时
Redis 历史 TTL覆盖期望的重连窗口
频道 key 派生HMAC,而非可猜 slug
重连时重新鉴权已通过 Sanctum 中间件确认
full-refresh 兜底客户端与服务端两侧都有处理
SSE 路由上的速率限制throttle:sse 已启用
connection_aborted() 检查位于轮询循环内部
本作品采用《CC 协议》,转载必须注明作者和本文链接