好的,我们来探讨一种非常有趣且实用的技术,它能让你的服务器“主动”地向客户端推送消息:服务器发送事件 (Server-Sent Events, SSE)。这就像是为你的应用开通一个“单向新闻快讯”频道,可以持续不断地向所有听众播报最新消息。
让你的服务器“主动说话”:NestJS 服务器发送事件 (SSE) 指南
想象一下你在关注一场激动人心的足球比赛的实时文字直播。
传统的“你问我答”模式 (HTTP Polling):
- 你(客户端) 每隔 5 秒钟,就焦急地刷新一次网页,问服务器:“嘿,有新情况吗?”
- 服务器(你的 NestJS 应用) 回答:“没,比分还是 1:0。”
- 你又等了 5 秒,再问:“现在呢?”
- 服务器:“还没……”
- ……直到第 89 分钟,你再次刷新时,服务器才告诉你:“进了!绝杀!比分变成 2:1 了!”
这种方式的问题显而易见:
- 低效:绝大多数的请求都是“空”的,浪费了大量的网络和服务器资源。
- 延迟:你只能在下一次刷新时才能知道进球的消息,而不是在进球发生的瞬间。
更高级的“新闻快讯”模式 (Server-Sent Events, SSE):
- 你(客户端) 第一次打开直播页面时,就对服务器说:“你好,我是这场比赛的忠实粉丝,请把所有最新进展都主动推送给我。”
- 服务器(你的 NestJS 应用) 收到请求后,不会立刻关闭连接。相反,它会保持这条连接处于打开状态,就像建立了一个单向的“广播频道”。
- 在接下来的比赛中,只要有任何新情况(进球、换人、红牌),服务器就会立刻通过这条一直开着的通道,将消息主动推送给你。
- 你无需再做任何事,就能实时地、不间断地接收到所有更新。
SSE 的核心特性:
- 单向通信:只能服务器 -> 客户端。客户端无法通过这条连接向服务器发送消息。
- 基于 HTTP:它完全建立在标准的 HTTP 协议之上,无需任何特殊的协议或服务器支持,兼容性极好。
- 自动重连:如果网络意外中断,浏览器会自动尝试重新连接到 SSE 端点。
- 简单:相比于双向通信的 WebSocket,SSE 的实现要简单得多。
SSE 非常适合那些“服务器是信息源,客户端是纯粹的接收者”的场景,比如:股票价格更新、新闻推送、系统状态监控、实时通知等。
1. 第一步:建立“广播频道” (在 Controller 中创建 SSE 端点)
在 NestJS 中,实现一个 SSE 端点非常简单。你只需要从一个 Controller
的方法中返回一个 Observable
。
前置知识:RxJS 的 Observable
我们之前在 HttpModule
中见过它。Observable
是一个数据流。它可以随着时间的推移,发出零个、一个或多个值。这与 SSE 的“持续推送多个消息”的特性完美契合。
src/notifications/notifications.controller.ts
import { Controller, Sse } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';
// 定义 SSE 消息的格式
interface MessageEvent {
data: string | object;
id?: string;
type?: string;
retry?: number;
}
@Controller('notifications')
export class NotificationsController {
// 1. 使用 @Sse() 装饰器来标记这是一个 SSE 端点
@Sse('subscribe')
subscribeToUpdates(): Observable<MessageEvent> {
// 2. 返回一个 Observable
// 使用 RxJS 的 interval() 函数,它会每隔 1 秒发出一个递增的数字 (0, 1, 2, ...)
return interval(1000).pipe(
// 3. 使用 map() 操作符,将数字转换为我们需要的 MessageEvent 格式
map((num: number) => ({
data: { message: `这是第 ${num + 1} 条新闻快讯!` },
type: 'news-flash', // 可以自定义事件类型
}))
);
}
}
工作流程:
- 客户端(浏览器)访问
GET /notifications/subscribe
。 @Sse()
装饰器被激活,它会自动设置正确的响应头 (Content-Type: text/event-stream
,Cache-Control: no-cache
,Connection: keep-alive
),并保持连接打开。subscribeToUpdates
方法返回一个Observable
。- NestJS 会订阅 (subscribe) 这个
Observable
。 - 每当
interval(1000)
发出一个新的数字时,map
操作符就会将其转换成一个MessageEvent
对象。 - NestJS 将这个对象格式化为 SSE 协议规定的文本格式,并通过那条打开的连接发送给客户端。
如何在浏览器中接收?
// 前端 JavaScript 代码
const eventSource = new EventSource(
'http://localhost:3000/notifications/subscribe'
);
// 监听所有消息
eventSource.onmessage = (event) => {
console.log('收到一条通用消息:', JSON.parse(event.data));
};
// 监听特定类型的消息
eventSource.addEventListener('news-flash', (event) => {
console.log('收到一条"新闻快讯":', JSON.parse(event.data));
});
eventSource.onerror = (error) => {
console.error('SSE 连接出错:', error);
// 如果服务器关闭连接,浏览器会在这里收到错误并自动尝试重连
eventSource.close();
};
2. “广播”真实世界的事件
上面的例子只是一个定时器。在真实应用中,我们希望在特定事件发生时才推送消息,比如“一个新订单被创建了”。这就需要结合我们之前学过的事件驱动模式 (@nestjs/event-emitter
)。
思路:
- 当
OrdersService
创建一个新订单时,它会发出一个'order.created'
内部事件。 NotificationsController
需要一种方法来“捕获”这个内部事件,并将其转化为一个外部的 SSE 事件推送给客户端。
挑战:Controller
是请求作用域的,每次请求都会创建新实例。而事件可能在任何时候发生。我们不能直接在 Controller
里监听。
解决方案:我们需要一个单例的服务来充当“广播中继站”。这个服务负责监听内部事件,并持有一个可以推送数据的流。
第一步:创建一个“广播中继站”服务
src/notifications/notification.service.ts
import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs'; // Subject 是一个特殊的 Observable,可以手动推送数据
@Injectable()
export class NotificationService {
// 创建一个 Subject,它既是 Observable 也是 Observer
private readonly eventStream = new Subject<MessageEvent>();
// 供 Controller 获取广播流的方法
getEventStream() {
return this.eventStream.asObservable();
}
// 供其他服务调用,以推送新消息的方法
sendNotification(data: any) {
this.eventStream.next({ data });
}
}
第二步:创建一个监听内部事件的 Listener
// src/orders/listeners/order-created.listener.ts
@Injectable()
export class OrderCreatedListener {
constructor(private readonly notificationService: NotificationService) {}
@OnEvent('order.created')
handleOrderCreatedEvent(payload: any) {
console.log('🚚 [Listener] 收到新订单事件,准备通知广播站...');
// 调用中继站服务,发送通知
this.notificationService.sendNotification({
message: `新订单 #${payload.id} 已创建!`,
amount: payload.amount,
});
}
}
第三步:修改 Controller,连接到“中继站”
// src/notifications/notifications.controller.ts
@Controller('notifications')
export class NotificationsController {
// 注入我们的中继站服务
constructor(private readonly notificationService: NotificationService) {}
@Sse('subscribe')
subscribeToUpdates(): Observable<MessageEvent> {
// 直接返回中继站的广播流
return this.notificationService.getEventStream();
}
}
现在,整个流程通了!当 OrdersService
发出 order.created
事件时,OrderCreatedListener
会捕获它,然后通过 NotificationService
这个中继站,将消息实时地推送到所有连接到 /notifications/subscribe
的客户端。
3. 企业级方案的思考
虽然 SSE 简单、好用,但在复杂的、需要大规模推送的中国企业级场景中,也存在一些局限性,并催生了更专业的解决方案。
原生 SSE 的局限性:
- 连接数限制:浏览器对来自同一个域的并发 HTTP 连接数是有限制的(通常是 6 个)。如果用户同时打开了多个需要 SSE 的页面,可能会耗尽连接数。
- 不支持二进制:SSE 只能传输 UTF-8 文本数据。
- 无内置的“房间”或“频道”概念:默认情况下,SSE 是广播给所有连接的客户端。如果想实现“只推送给某个特定用户”或“只推送给订阅了某个股票代码的用户”,需要自己实现一套复杂的连接管理和过滤逻辑。
- 代理和防火墙问题:一些网络中间件可能会因为不理解“长连接”而提前关闭它。
企业级方案 Demo:集成专业的推送服务或 WebSocket
对于需要大规模、高可靠、精细化推送的场景,企业通常会选择:
WebSocket: 提供**全双工(双向)**通信,功能更强大,可以轻松实现“房间”和“私聊”等功能。像 Socket.IO 这样的库在 WebSocket 的基础上还提供了优雅的回退机制(如果 WebSocket 不可用,会自动降级到 HTTP 长轮询)。
- NestJS 集成: NestJS 提供了
@nestjs/websockets
模块,可以非常方便地集成 Socket.IO 或原生的 WebSocket。你可以创建一个 Gateway,在其中实现handleConnection
来管理用户和房间,然后使用server.to('room-name').emit(...)
来进行定向推送。
- NestJS 集成: NestJS 提供了
专业的第三方推送服务 (如:极光推送 JPush, 个推 Getui):
- 工作模式: 这些服务通常用于移动端 App 推送,但很多也提供 Web 端推送的 SDK。你的 NestJS 后端不再直接管理与客户端的长连接。
- 流程:
- 客户端(Web 或 App)集成第三方服务的 SDK,并在启动时向该服务注册,获取一个唯一的设备 ID。
- 客户端将这个设备 ID 发送给你的 NestJS 后端,并与用户 ID 关联起来,存入数据库。
- 当你的 NestJS 后端需要给某个特定用户推送消息时,它不再自己去建立连接,而是调用第三方服务的服务端 API,并告诉它:“请给设备 ID 为
xxx
的用户发送这条消息:'...'
”。 - 第三方服务拥有一个庞大、稳定、全球分布的推送集群,它会负责将消息可靠地送达给目标设备。
- 优点: 将复杂的长连接管理、设备在线状态维护、消息送达保证等专业问题外包给了专家,让你的后端可以更专注于业务逻辑。这是绝大多数需要大规模 App 推送的公司的标准做法。
总结
Server-Sent Events (SSE) 是一种轻量、高效的实现服务器向客户端单向推送的技术。
当你想要... | 你应该使用... | 核心概念 |
---|---|---|
快速创建一个单向的数据推送端点 | 在 Controller 方法上使用 @Sse() ,并返回一个 Observable | Observable 数据流与 SSE 的持续消息推送特性完美匹配。 |
在特定业务事件发生时才推送消息 | 结合 @nestjs/event-emitter ,创建一个单例的“中继站”服务 | 使用 Subject 来连接内部事件和外部 SSE 流,实现解耦。 |
实现双向通信或复杂的房间/频道管理 | 考虑使用 WebSocket (通过 @nestjs/websockets ) | 功能更强大,但实现也更复杂。 |
为移动 App 或大规模 Web 应用实现可靠的、精细化的消息推送 | 集成专业的第三方推送服务 | 将专业问题外包,聚焦业务。这是企业级的标准实践。 |
SSE 是你技术工具箱中一个非常有用的工具,特别适合构建实时仪表盘、通知中心等功能。了解它的适用场景和局限性,并知道何时该转向更强大的 WebSocket 或第三方服务,是成为一名成熟后端架构师的关键一步。