Laravel SSE 上线实战,重连、超时与多租户事件流隔离
Laravel 的 SSE(Server-Sent Events,基于长连接 HTTP 的服务器推送),在开发环境里上手快得"可疑",一旦真实用户打上来又"脆弱得可疑"。跑通一个基础流大概二十分钟,而要让 Laravel SSE 在生产里稳住并发、跨多租户、在连接断开后客户端干净地重连上——要花的时间可就不止一点半点。
本文要覆盖的是那些只有在 SSE 上线后才会冒出来的失败模式:客户端流到一半重连会发生什么,PHP-FPM 与 Nginx 如何联手把长连接悄悄杀掉,以及如何按租户隔离事件流而不引入数据泄漏的风险。此外还会深入谈测试,因为 SSE 正是那种"看起来一切正常、直到某一串特定的事件序列把窟窿捅出来"的功能。
选择 SSE 意味着承担什么
讲失败模式之前,先坦白 SSE 相对 WebSocket 的权衡。SSE 是单向的——服务端推、客户端收。浏览器原生通过 EventSource API 自动处理重连,协议本身就是 HTTP,也就是说能和既有 Nginx 配置、鉴权中间件、负载均衡器顺利相处,直到它不肯了为止。
会咬人的那一条约束是连接持久性。每个连上的客户端在会话生命周期内都占着一个长连接。在 PHP-FPM 下,这意味着一个 worker 被这条连接钉住。默认 pm.max_children 在 5 到 50 之间(视机器规格而定),30 个并发 SSE 客户端就足以把 PHP-FPM 进程池打满。这不是理论威胁——它是生产里最先爆的那一块。
替代方案是把 SSE 响应交给非阻塞驱动去跑。两种路径后面都会讲到。
路由与流式控制器
Laravel 11/12 通过 StreamedResponse 处理 SSE。路由本身很朴素:
// routes/api.php
Route::get('/stream/events', StreamEventsController::class)
->middleware(['auth:sanctum', 'throttle:sse']);注意那一条专用的 throttle:sse 速率限制器——这在生产里不是可选项。在 bootstrap/app.php 里定义它:
// bootstrap/app.php
->withMiddleware(function (Middleware $middleware) {
$middleware->throttleWithRedis('sse', function (Request $request) {
return Limit::perMinute(10)->by($request->user()?->id ?: $request->ip());
});
})控制器:
<?php
namespace App\Http\Controllers;
use App\Services\TenantEventStream;
use Illuminate\Http\Request;
use Symfony\Component\HttpFoundation\StreamedResponse;
class StreamEventsController
{
public function __invoke(Request $request, TenantEventStream $stream): StreamedResponse
{
$tenant = $request->user()->tenant;
$lastId = $request->header('Last-Event-ID');
return response()->stream(function () use ($tenant, $lastId, $stream) {
$stream->pipe($tenant, $lastId, function (string $payload) {
echo "data: {$payload}\n\n";
ob_flush();
flush();
});
}, 200, [
'Content-Type' => 'text/event-stream',
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
'Connection' => 'keep-alive',
]);
}
}这里有两处特别重要。X-Accel-Buffering: no 关闭了 Nginx 对这条响应的代理缓冲——没有它,Nginx 会默默把事件攒在一起再批量下发,实时契约直接被破坏。Last-Event-ID 头的读取则是重连逻辑的基石,下一节就讲。
客户端重连逻辑
浏览器的 EventSource 会在连接断开后自动重连。默认等 3 秒、重新请求同一个 URL。它不会自动告诉服务端"我上次处理到哪儿了"——除非事件流里正确使用了 id 和 retry 字段。
每一条服务端发出的事件都应当带一个 id:
private function formatEvent(string $data, string $id, string $event = 'message'): string
{
return "id: {$id}\nevent: {$event}\ndata: {$data}\nretry: 5000\n\n";
}retry 告诉浏览器下次重连前等多久,5000ms 是合理的生产值。id 是浏览器在重连时通过 Last-Event-ID 头回传的内容。
服务端那边,当客户端带着 Last-Event-ID 重连时,需要把期间漏掉的事件重放。这就要看事件存储怎么设计:
<?php
namespace App\Services;
use App\Models\Tenant;
use Illuminate\Support\Facades\Redis;
class TenantEventStream
{
private const MAX_REPLAY = 100;
private const POLL_INTERVAL = 1; // 秒
private const MAX_DURATION = 60; // 达到后主动断开并让客户端重连(秒)
public function pipe(Tenant $tenant, ?string $lastId, callable $emit): void
{
$channel = $this->channelKey($tenant);
$elapsed = 0;
// 先重放错过的事件
if ($lastId !== null) {
$this->replay($channel, $lastId, $emit);
}
// 发一条注释型心跳保持连接
echo ": heartbeat\n\n";
ob_flush();
flush();
while ($elapsed < self::MAX_DURATION) {
if (connection_aborted()) {
break;
}
$events = Redis::lrange("{$channel}:pending", 0, -1);
foreach ($events as $raw) {
$event = json_decode($raw, true);
$emit($this->formatEvent($event['data'], $event['id'], $event['type']));
Redis::lrem("{$channel}:pending", 0, $raw);
}
sleep(self::POLL_INTERVAL);
$elapsed += self::POLL_INTERVAL;
}
// 优雅关闭——让客户端重新连回来
echo "event: reconnect\ndata: {}\n\n";
ob_flush();
flush();
}
private function replay(string $channel, string $lastId, callable $emit): void
{
$history = Redis::lrange("{$channel}:history", 0, self::MAX_REPLAY - 1);
$found = false;
foreach ($history as $raw) {
$event = json_decode($raw, true);
if ($found) {
$emit($this->formatEvent($event['data'], $event['id'], $event['type']));
}
if ($event['id'] === $lastId) {
$found = true;
}
}
}
private function channelKey(Tenant $tenant): string
{
return "sse:tenant:{$tenant->id}";
}
private function formatEvent(string $data, string $id, string $event = 'message'): string
{
return "id: {$id}\nevent: {$event}\ndata: {$data}\nretry: 5000\n\n";
}
}MAX_DURATION 的上限是故意设的。代码会在 60 秒后主动关闭连接并发出一条 reconnect 事件,客户端立刻重连。这样做既避免 PHP-FPM worker 被永久霸占,也为每次重连提供了一次轮换鉴权上下文的天然时机。这会给重连周期加一点点延迟——考虑到 5 秒的 retry 值,用户几乎察觉不到。
生产侧陷阱。 不要在流式控制器里用
set_time_limit(0)。听起来很合理——让连接永远开着——但代价是:一个行为异常的客户端或一次网络分区就能把 PHP-FPM worker 永久扣住。在并发上升时,这会拖垮整个应用,而不仅仅是 SSE 端点。设上限,优雅重连。
高并发下的连接断开
PHP-FPM 的瓶颈是真实的。标准解法是把 SSE 放在进程级而不是 worker 级处理。两种模式值得并列对比:
| 方案 | 并发上限 | 内存画像 | 复杂度 |
|---|---|---|---|
| PHP-FPM 轮询(如上) | 约 pm.max_children | 每 worker 较低 | 低 |
| Laravel Octane(Swoole) | 数千 | 基线更高 | 中 |
| 独立的 Node/Go 微服务 | 非常高 | 看实现 | 高 |
| Reverb + 自定义 SSE 适配器 | 高 | 中 | 中 |
对多数 Laravel 应用,PHP-FPM 轮询方案只要调得够狠,能顶住几百个并发 SSE 客户端。再往上,Octane + Swoole 是阻力最小的路径。
如果已经在生产 Nginx 栈上跑 Laravel,还需要为 SSE 加一组专用指令,阻止 Nginx 把这条流当成普通的带缓冲 HTTP 响应:
location /stream/events {
proxy_pass http://php-fpm;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 120s;
chunked_transfer_encoding on;
}proxy_read_timeout 这一条是关键。Nginx 默认 60 秒——会在应用还没来得及发心跳之前就把长连接剪断。把它设得比 MAX_DURATION 上限更高。
至于心跳本身,那条注释式 keep-alive(echo ": heartbeat\n\n")并非装饰。AWS ALB 以及许多代理会把空闲超过 60 秒的连接强制关闭。每 20–30 秒发一次心跳正好能把它们按住:
private function emitHeartbeat(): void
{
echo ": heartbeat " . now()->timestamp . "\n\n";
ob_flush();
flush();
}多租户事件流隔离
多数 SSE 实现都在这一段埋着一个沉默的安全风险。如果频道 key 或订阅逻辑写得太随意,以另一租户凭据重连或鉴权中的竞态,都可能让一个租户的事件被另一租户看到。
隔离模型必须在三层都强制:
1. 频道 key 必须是确定性而不可猜的
绝不要用 tenant_1 这样的序号、或租户域名 slug 作为 Redis key。基于租户 ID 与一个秘密派生:
private function channelKey(Tenant $tenant): string
{
return 'sse:' . hash_hmac('sha256', (string) $tenant->id, config('app.key'));
}这样即便 Redis key 命名规律被看到,攻击者也无法在没有 app.key 的情况下构造出别的租户的合法 key。
2. 每次重连都必须重新鉴权,而不是靠缓存
Laravel Sanctum 的 token 鉴权走标准中间件,也就是说每次请求都会重新校验——这对每一次 SSE 重连同样成立。不要用长生命周期 cookie 或共享 session 绕过它。每次重连都是一次全新的 HTTP 请求,就按全新请求来对待。
// 中间件栈本身会处理鉴权,但 ability 的作用域要显式写出
Route::get('/stream/events', StreamEventsController::class)
->middleware(['auth:sanctum', 'ability:stream:read', 'throttle:sse']);3. 事件发布方必须在写入时做租户作用域,而不是读取时
当一个 job 或 service 向 Redis 发布事件时,必须使用同一套 HMAC 派生的 key。任何未做作用域的发布,都是一次"待暴露"的数据泄漏:
<?php
namespace App\Services;
use App\Models\Tenant;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Str;
class EventPublisher
{
private const HISTORY_TTL = 300; // 5 分钟
private const MAX_HISTORY = 500;
public function publish(Tenant $tenant, string $type, array $data): void
{
$id = (string) Str::uuid();
$channel = $this->channelKey($tenant);
$payload = json_encode([
'id' => $id,
'type' => $type,
'data' => json_encode($data),
]);
Redis::pipeline(function ($pipe) use ($channel, $payload) {
// 活跃连接消费的待处理队列
$pipe->rpush("{$channel}:pending", $payload);
// 用于重连重放的历史
$pipe->rpush("{$channel}:history", $payload);
$pipe->ltrim("{$channel}:history", -self::MAX_HISTORY, -1);
$pipe->expire("{$channel}:history", self::HISTORY_TTL);
});
}
private function channelKey(Tenant $tenant): string
{
return 'sse:' . hash_hmac('sha256', (string) $tenant->id, config('app.key'));
}
}这里用 Redis pipeline 不只是为了性能,而是为了让"pending 队列"与"history 列表"的写入具备原子性。一个只写了 pending 却没写 history 的中途失败,会让重放逻辑错乱。
架构侧备注。 如果是多租户架构,且同一租户下可以有多个并发用户——而不是一条"全租户共用"的事件流——就还需要再加一层作用域。把
tenant_id + user_id一起参与 key 派生,或引入订阅模型让每个用户订阅命名频道。一条租户级别的流对于管理员看板或后台作业状态推送还行,但一旦不同用户需要看见不同事件,它就塌了。
从队列作业里发布事件
多数真实应用里,事件并不源自控制器,而是源自队列作业、Webhook 或定时任务。基于既有生产级队列架构,一个队列作业可以这样干净地向租户事件流发布:
<?php
namespace App\Jobs;
use App\Models\Tenant;
use App\Services\EventPublisher;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BroadcastTenantEvent implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;
public int $backoff = 5;
public function __construct(
private readonly Tenant $tenant,
private readonly string $type,
private readonly array $data,
) {}
public function handle(EventPublisher $publisher): void
{
$publisher->publish($this->tenant, $this->type, $this->data);
}
public function failed(\Throwable $e): void
{
\Log::error('SSE event publish failed', [
'tenant' => $this->tenant->id,
'type' => $this->type,
'error' => $e->getMessage(),
]);
}
}$tries 与 $backoff 的配置是因为:Redis 在内存压力下的发布失败是真实的生产场景。作业会用 5 秒退避重试,而不是把事件默默吞掉。
测试深度剖析
SSE 的测试写起来出了名地别扭。流式响应不像普通 JSON 响应那样行为可预测,而重连与重放逻辑又引入了时间依赖,这些依赖与 PHPUnit 的同步执行模型并不天然契合。下面给出能真正覆盖它的写法。
单元测试:事件流服务
把 TenantEventStream 放在隔离环境里测,mock 掉 Redis,再按正确顺序断言事件负载:
<?php
namespace Tests\Unit;
use App\Models\Tenant;
use App\Services\TenantEventStream;
use Illuminate\Support\Facades\Redis;
use Tests\TestCase;
class TenantEventStreamTest extends TestCase
{
public function test_replays_events_after_last_id(): void
{
$tenant = Tenant::factory()->make(['id' => 1]);
$channel = 'sse:' . hash_hmac('sha256', '1', config('app.key'));
$events = [
json_encode(['id' => 'evt-1', 'type' => 'update', 'data' => '{"foo":"bar"}']),
json_encode(['id' => 'evt-2', 'type' => 'update', 'data' => '{"foo":"baz"}']),
json_encode(['id' => 'evt-3', 'type' => 'update', 'data' => '{"foo":"qux"}']),
];
Redis::shouldReceive('lrange')
->with("{$channel}:history", 0, 99)
->once()
->andReturn($events);
Redis::shouldReceive('lrange')
->with("{$channel}:pending", 0, -1)
->andReturn([]);
$emitted = [];
$stream = $this->app->make(TenantEventStream::class);
// 覆盖 MAX_DURATION 为 0 让循环立刻退出
// 可用反射或测试子类实现
$stream->pipe($tenant, 'evt-1', function (string $payload) use (&$emitted) {
$emitted[] = $payload;
});
$this->assertCount(2, $emitted); // 只有 evt-2 与 evt-3
$this->assertStringContainsString('evt-2', $emitted[0]);
$this->assertStringContainsString('evt-3', $emitted[1]);
}
}要覆盖循环时长,可以把 MAX_DURATION 设计成能从构造函数注入,或者暴露成可被测试子类覆盖的公有常量。硬编码的 private const 是测试气味。
集成测试:HTTP 响应
Laravel 的 TestResponse 原生并不支持流式断言,但响应头与初始输出结构仍然可测:
public function test_sse_response_has_correct_headers(): void
{
$user = User::factory()->withTenant()->create();
$response = $this->actingAs($user, 'sanctum')
->get('/stream/events');
$response->assertStatus(200);
$response->assertHeader('Content-Type', 'text/event-stream');
$response->assertHeader('X-Accel-Buffering', 'no');
$response->assertHeader('Cache-Control', 'no-cache');
}
public function test_unauthenticated_request_is_rejected(): void
{
$this->get('/stream/events')->assertUnauthorized();
}
public function test_tenant_isolation_different_users_cannot_share_stream(): void
{
$tenantA = Tenant::factory()->create();
$tenantB = Tenant::factory()->create();
$userA = User::factory()->for($tenantA)->create();
$userB = User::factory()->for($tenantB)->create();
$keyA = 'sse:' . hash_hmac('sha256', (string) $tenantA->id, config('app.key'));
$keyB = 'sse:' . hash_hmac('sha256', (string) $tenantB->id, config('app.key'));
// 两个频道 key 必须不同
$this->assertNotEquals($keyA, $keyB);
// 向 A 的频道发布不应落到 B 的 key 上
Redis::shouldReceive('pipeline')->once();
(new EventPublisher())->publish($tenantA, 'update', ['value' => 42]);
Redis::shouldNotHaveReceived('rpush', ["{$keyB}:pending", \Mockery::any()]);
}用 Pest 测试重连重放
如果测试套件用 Pest,dataset 特性能把各种重连场景写得很清晰:
dataset('reconnect_scenarios', [
'reconnect from first event' => ['evt-1', 2], // 应重放 2 条
'reconnect from second event' => ['evt-2', 1], // 应重放 1 条
'reconnect from last event' => ['evt-3', 0], // 无需重放
'unknown last id' => ['evt-99', 0], // 未知 id,不重放
]);
it('replays correct events on reconnect', function (string $lastId, int $expected) {
// ... setup 与断言
})->with('reconnect_scenarios');它覆盖的是最容易被忽略的边界:Last-Event-ID 过期(历史缓冲里已不存在),以及 Last-Event-ID 恰好指向历史里最后一条事件(没有东西可重放,连接应正常继续)。
边界情况提醒。 当 Redis 历史缓冲被裁剪后(因为达到
MAX_HISTORY),重连客户端可能发来一个历史里已经找不到的Last-Event-ID。replay函数必须优雅处理这种情况:要么重放整段可用历史,要么发出full-refresh事件告诉客户端放弃增量、回到 API 去拉全量。
private function replay(string $channel, string $lastId, callable $emit): void
{
$history = Redis::lrange("{$channel}:history", 0, self::MAX_REPLAY - 1);
$found = false;
$buffer = [];
foreach ($history as $raw) {
$event = json_decode($raw, true);
if ($found) {
$buffer[] = $raw;
}
if ($event['id'] === $lastId) {
$found = true;
}
}
if (! $found) {
// 这个 id 已经被驱逐——通知客户端做全量刷新
echo "event: full-refresh\ndata: {}\n\n";
ob_flush();
flush();
return;
}
foreach ($buffer as $raw) {
$event = json_decode($raw, true);
$emit($this->formatEvent($event['data'], $event['id'], $event['type']));
}
}监控与可观测性
流式连接对 Laravel Telescope 的标准请求日志是不可见的——在 Telescope 看来,这次请求还没结束。需要对流做显式插桩。
把活跃 SSE 连接用带 TTL 的 Redis key 记录下来,并在管理面板里展示其计数:
// 连接建立时
Redis::setex("sse:active:{$tenant->id}:{$connectionId}", 90, 1);
// 心跳时续 TTL
Redis::expire("sse:active:{$tenant->id}:{$connectionId}", 90);
// 连接关闭时(connection_aborted 或命中 MAX_DURATION)
Redis::del("sse:active:{$tenant->id}:{$connectionId}");
// 查询活跃连接数
$count = count(Redis::keys("sse:active:{$tenant->id}:*"));这 90 秒的 TTL 充当"死人开关"。连接因为网络分区或 PHP-FPM worker 崩溃而没有走到正常关闭路径时,key 会自动过期,计数不会失真。
生产上线 Checklist
SSE 正式上线前,逐条过一遍:
| 关注点 | 检查项 |
|---|---|
PHP-FPM pm.max_children | 按预期并发 SSE 连接数设定 |
Nginx proxy_read_timeout | 高于 MAX_DURATION |
X-Accel-Buffering: no | 出现在响应头中 |
| 心跳间隔 | ≤ 30 秒,足以扛过 ALB / 代理的空闲超时 |
| Redis 历史 TTL | 覆盖期望的重连窗口 |
| 频道 key 派生 | HMAC,而非可猜 slug |
| 重连时重新鉴权 | 已通过 Sanctum 中间件确认 |
full-refresh 兜底 | 客户端与服务端两侧都有处理 |
| SSE 路由上的速率限制 | throttle:sse 已启用 |
connection_aborted() 检查 | 位于轮询循环内部 |