Skip to content

好的,我们来探讨一种非常有趣且实用的技术,它能让你的服务器“主动”地向客户端推送消息:服务器发送事件 (Server-Sent Events, SSE)。这就像是为你的应用开通一个“单向新闻快讯”频道,可以持续不断地向所有听众播报最新消息。

让你的服务器“主动说话”:NestJS 服务器发送事件 (SSE) 指南

想象一下你在关注一场激动人心的足球比赛的实时文字直播。

传统的“你问我答”模式 (HTTP Polling):

  • 你(客户端) 每隔 5 秒钟,就焦急地刷新一次网页,问服务器:“嘿,有新情况吗?”
  • 服务器(你的 NestJS 应用) 回答:“没,比分还是 1:0。”
  • 你又等了 5 秒,再问:“现在呢?”
  • 服务器:“还没……”
  • ……直到第 89 分钟,你再次刷新时,服务器才告诉你:“进了!绝杀!比分变成 2:1 了!”

这种方式的问题显而易见:

  1. 低效:绝大多数的请求都是“空”的,浪费了大量的网络和服务器资源。
  2. 延迟:你只能在下一次刷新时才能知道进球的消息,而不是在进球发生的瞬间。

更高级的“新闻快讯”模式 (Server-Sent Events, SSE):

  • 你(客户端) 第一次打开直播页面时,就对服务器说:“你好,我是这场比赛的忠实粉丝,请把所有最新进展都主动推送给我。”
  • 服务器(你的 NestJS 应用) 收到请求后,不会立刻关闭连接。相反,它会保持这条连接处于打开状态,就像建立了一个单向的“广播频道”。
  • 在接下来的比赛中,只要有任何新情况(进球、换人、红牌),服务器就会立刻通过这条一直开着的通道,将消息主动推送给你。
  • 你无需再做任何事,就能实时地、不间断地接收到所有更新。

SSE 的核心特性

  • 单向通信:只能服务器 -> 客户端。客户端无法通过这条连接向服务器发送消息。
  • 基于 HTTP:它完全建立在标准的 HTTP 协议之上,无需任何特殊的协议或服务器支持,兼容性极好。
  • 自动重连:如果网络意外中断,浏览器会自动尝试重新连接到 SSE 端点。
  • 简单:相比于双向通信的 WebSocket,SSE 的实现要简单得多。

SSE 非常适合那些“服务器是信息源,客户端是纯粹的接收者”的场景,比如:股票价格更新、新闻推送、系统状态监控、实时通知等。

1. 第一步:建立“广播频道” (在 Controller 中创建 SSE 端点)

在 NestJS 中,实现一个 SSE 端点非常简单。你只需要从一个 Controller 的方法中返回一个 Observable

前置知识:RxJS 的 Observable

我们之前在 HttpModule 中见过它。Observable 是一个数据流。它可以随着时间的推移,发出零个、一个或多个值。这与 SSE 的“持续推送多个消息”的特性完美契合。

src/notifications/notifications.controller.ts

typescript
import { Controller, Sse } from '@nestjs/common';
import { Observable, interval, map } from 'rxjs';

// 定义 SSE 消息的格式
interface MessageEvent {
  data: string | object;
  id?: string;
  type?: string;
  retry?: number;
}

@Controller('notifications')
export class NotificationsController {
  // 1. 使用 @Sse() 装饰器来标记这是一个 SSE 端点
  @Sse('subscribe')
  subscribeToUpdates(): Observable<MessageEvent> {
    // 2. 返回一个 Observable

    // 使用 RxJS 的 interval() 函数,它会每隔 1 秒发出一个递增的数字 (0, 1, 2, ...)
    return interval(1000).pipe(
      // 3. 使用 map() 操作符,将数字转换为我们需要的 MessageEvent 格式
      map((num: number) => ({
        data: { message: `这是第 ${num + 1} 条新闻快讯!` },
        type: 'news-flash', // 可以自定义事件类型
      }))
    );
  }
}

工作流程:

  1. 客户端(浏览器)访问 GET /notifications/subscribe
  2. @Sse() 装饰器被激活,它会自动设置正确的响应头 (Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive),并保持连接打开。
  3. subscribeToUpdates 方法返回一个 Observable
  4. NestJS 会订阅 (subscribe) 这个 Observable
  5. 每当 interval(1000) 发出一个新的数字时,map 操作符就会将其转换成一个 MessageEvent 对象。
  6. NestJS 将这个对象格式化为 SSE 协议规定的文本格式,并通过那条打开的连接发送给客户端。

如何在浏览器中接收?

javascript
// 前端 JavaScript 代码
const eventSource = new EventSource(
  'http://localhost:3000/notifications/subscribe'
);

// 监听所有消息
eventSource.onmessage = (event) => {
  console.log('收到一条通用消息:', JSON.parse(event.data));
};

// 监听特定类型的消息
eventSource.addEventListener('news-flash', (event) => {
  console.log('收到一条"新闻快讯":', JSON.parse(event.data));
});

eventSource.onerror = (error) => {
  console.error('SSE 连接出错:', error);
  // 如果服务器关闭连接,浏览器会在这里收到错误并自动尝试重连
  eventSource.close();
};

2. “广播”真实世界的事件

上面的例子只是一个定时器。在真实应用中,我们希望在特定事件发生时才推送消息,比如“一个新订单被创建了”。这就需要结合我们之前学过的事件驱动模式 (@nestjs/event-emitter)

思路

  1. OrdersService 创建一个新订单时,它会发出一个 'order.created' 内部事件
  2. NotificationsController 需要一种方法来“捕获”这个内部事件,并将其转化为一个外部的 SSE 事件推送给客户端。

挑战Controller 是请求作用域的,每次请求都会创建新实例。而事件可能在任何时候发生。我们不能直接在 Controller 里监听。

解决方案:我们需要一个单例的服务来充当“广播中继站”。这个服务负责监听内部事件,并持有一个可以推送数据的流。

第一步:创建一个“广播中继站”服务

src/notifications/notification.service.ts

typescript
import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs'; // Subject 是一个特殊的 Observable,可以手动推送数据

@Injectable()
export class NotificationService {
  // 创建一个 Subject,它既是 Observable 也是 Observer
  private readonly eventStream = new Subject<MessageEvent>();

  // 供 Controller 获取广播流的方法
  getEventStream() {
    return this.eventStream.asObservable();
  }

  // 供其他服务调用,以推送新消息的方法
  sendNotification(data: any) {
    this.eventStream.next({ data });
  }
}

第二步:创建一个监听内部事件的 Listener

typescript
// src/orders/listeners/order-created.listener.ts
@Injectable()
export class OrderCreatedListener {
  constructor(private readonly notificationService: NotificationService) {}

  @OnEvent('order.created')
  handleOrderCreatedEvent(payload: any) {
    console.log('🚚 [Listener] 收到新订单事件,准备通知广播站...');
    // 调用中继站服务,发送通知
    this.notificationService.sendNotification({
      message: `新订单 #${payload.id} 已创建!`,
      amount: payload.amount,
    });
  }
}

第三步:修改 Controller,连接到“中继站”

typescript
// src/notifications/notifications.controller.ts
@Controller('notifications')
export class NotificationsController {
  // 注入我们的中继站服务
  constructor(private readonly notificationService: NotificationService) {}

  @Sse('subscribe')
  subscribeToUpdates(): Observable<MessageEvent> {
    // 直接返回中继站的广播流
    return this.notificationService.getEventStream();
  }
}

现在,整个流程通了!当 OrdersService 发出 order.created 事件时,OrderCreatedListener 会捕获它,然后通过 NotificationService 这个中继站,将消息实时地推送到所有连接到 /notifications/subscribe 的客户端。

3. 企业级方案的思考

虽然 SSE 简单、好用,但在复杂的、需要大规模推送的中国企业级场景中,也存在一些局限性,并催生了更专业的解决方案。

原生 SSE 的局限性

  1. 连接数限制:浏览器对来自同一个域的并发 HTTP 连接数是有限制的(通常是 6 个)。如果用户同时打开了多个需要 SSE 的页面,可能会耗尽连接数。
  2. 不支持二进制:SSE 只能传输 UTF-8 文本数据。
  3. 无内置的“房间”或“频道”概念:默认情况下,SSE 是广播给所有连接的客户端。如果想实现“只推送给某个特定用户”或“只推送给订阅了某个股票代码的用户”,需要自己实现一套复杂的连接管理和过滤逻辑。
  4. 代理和防火墙问题:一些网络中间件可能会因为不理解“长连接”而提前关闭它。

企业级方案 Demo:集成专业的推送服务或 WebSocket

对于需要大规模、高可靠、精细化推送的场景,企业通常会选择:

  • WebSocket: 提供**全双工(双向)**通信,功能更强大,可以轻松实现“房间”和“私聊”等功能。像 Socket.IO 这样的库在 WebSocket 的基础上还提供了优雅的回退机制(如果 WebSocket 不可用,会自动降级到 HTTP 长轮询)。

    • NestJS 集成: NestJS 提供了 @nestjs/websockets 模块,可以非常方便地集成 Socket.IO 或原生的 WebSocket。你可以创建一个 Gateway,在其中实现 handleConnection 来管理用户和房间,然后使用 server.to('room-name').emit(...) 来进行定向推送。
  • 专业的第三方推送服务 (如:极光推送 JPush, 个推 Getui):

    • 工作模式: 这些服务通常用于移动端 App 推送,但很多也提供 Web 端推送的 SDK。你的 NestJS 后端不再直接管理与客户端的长连接。
    • 流程:
      1. 客户端(Web 或 App)集成第三方服务的 SDK,并在启动时向该服务注册,获取一个唯一的设备 ID。
      2. 客户端将这个设备 ID 发送给你的 NestJS 后端,并与用户 ID 关联起来,存入数据库。
      3. 当你的 NestJS 后端需要给某个特定用户推送消息时,它不再自己去建立连接,而是调用第三方服务的服务端 API,并告诉它:“请给设备 ID 为 xxx 的用户发送这条消息:'...'”。
      4. 第三方服务拥有一个庞大、稳定、全球分布的推送集群,它会负责将消息可靠地送达给目标设备。
    • 优点: 将复杂的长连接管理、设备在线状态维护、消息送达保证等专业问题外包给了专家,让你的后端可以更专注于业务逻辑。这是绝大多数需要大规模 App 推送的公司的标准做法。

总结

Server-Sent Events (SSE) 是一种轻量、高效的实现服务器向客户端单向推送的技术。

当你想要...你应该使用...核心概念
快速创建一个单向的数据推送端点在 Controller 方法上使用 @Sse(),并返回一个 ObservableObservable 数据流与 SSE 的持续消息推送特性完美匹配。
在特定业务事件发生时才推送消息结合 @nestjs/event-emitter,创建一个单例的“中继站”服务使用 Subject 来连接内部事件和外部 SSE 流,实现解耦。
实现双向通信或复杂的房间/频道管理考虑使用 WebSocket (通过 @nestjs/websockets)功能更强大,但实现也更复杂。
为移动 App 或大规模 Web 应用实现可靠的、精细化的消息推送集成专业的第三方推送服务将专业问题外包,聚焦业务。这是企业级的标准实践。

SSE 是你技术工具箱中一个非常有用的工具,特别适合构建实时仪表盘、通知中心等功能。了解它的适用场景和局限性,并知道何时该转向更强大的 WebSocket 或第三方服务,是成为一名成熟后端架构师的关键一步。