Skip to content

当然!我们来探讨一个能让你应用的处理能力和用户体验产生质的飞跃的技术:队列 (Queues)。这就像是为你的应用那家繁忙的餐厅,打造一个高效、有序、永不掉线的“智能后厨系统”。

为你的应用打造一个“高效的后厨”:NestJS 队列完全指南

想象一下你的应用是一家网红餐厅,生意异常火爆。

一位顾客点了一道非常复杂的招牌菜——“佛跳墙”(一个耗时很长的任务,比如视频转码、生成复杂的 PDF 报告、发送大量邮件)。

没有队列的“传统厨房”模式:

  • 服务员(你的 Controller 接到订单后,就站在原地,死死地盯着厨师(你的 Service
  • 厨师需要花 10 分钟来精心烹制这道菜。
  • 在这 10 分钟里,服务员不能去接待其他客人,只能干等。顾客也等得心急如焚,甚至可能因为等待超时(HTTP 请求超时)而直接走掉。

这显然是一种糟糕的体验。

引入队列的“智能后厨”模式:

  • 服务员接到“佛跳墙”的订单后,立刻将订单贴在后厨的**订单栏(队列 Queue)**上,然后马上拿到一个取餐号(Job ID)返回给顾客。
  • 服务员立刻就可以去接待下一位客人了。整个点餐过程(HTTP 请求-响应)在瞬间完成。
  • 后厨的厨师们(处理器 Processor) 会自动地、按顺序地从订单栏上取下订单,然后不慌不忙地开始烹制。
  • 菜做好后,系统会通过取餐号通知顾客来取餐(通过 WebSocket、Webhook 或其他方式通知任务已完成)。

队列的核心思想就是异步处理 (Asynchronous Processing)解耦 (Decoupling)。它将一个耗时的任务从主请求流程中剥离出来,放到一个“待办事项”列表中,由后台的工作进程(Worker)在稍后的时间里来处理。

使用队列的好处:

  • 极速响应:API 可以立即响应用户,无需等待耗时任务完成。
  • 提升吞吐量:应用可以接收更多的并发请求,因为主线程不会被长时间阻塞。
  • 可靠性与重试:如果一个任务失败了(比如厨师不小心打翻了锅),队列系统可以自动进行重试。
  • 削峰填谷:在请求高峰期,可以将任务积压在队列中,由后台进程平稳地消费,避免系统因瞬时压力过大而崩溃。

NestJS 通过 @nestjs/bull 模块,与强大而成熟的 Bull 库(一个基于 Redis 的队列系统)进行了深度集成。

1. 第一步:搭建“智能后厨”的基础设施

前置知识:为什么需要 Redis?

Bull 不是一个独立的程序,它需要一个消息代理 (Broker) 来存储和管理队列中的任务。Redis 是一个高性能的内存数据库,非常适合扮演这个角色。队列中的所有“订单”(Jobs)都会被存放在 Redis 中。

第一步:安装必要的依赖

bash
# 安装 Bull 集成模块和核心库
npm install @nestjs/bull bull

# 安装 Redis 驱动
npm install ioredis

# 你还需要一个正在运行的 Redis 服务器实例

第二步:在 AppModule 中连接到 Redis 并注册队列

typescript
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),

    // 1. 全局配置 Bull 如何连接到 Redis
    BullModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        redis: {
          host: configService.get<string>('REDIS_HOST', 'localhost'),
          port: configService.get<number>('REDIS_PORT', 6379),
        },
      }),
    }),

    // 2. 注册一个具体的队列,并给它起个名字
    BullModule.registerQueue({
      name: 'audio', // 这个队列专门用来处理音频任务
    }),

    // 还需要导入你的音频处理模块...
  ],
})
export class AppModule {}

2. 第二步:接收订单并放入队列 (生产者 Producer)

生产者是代码中负责创建任务并将其添加到队列中的部分。通常是我们的 ControllerService

src/audio/audio.service.ts

typescript
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class AudioService {
  // 使用 @InjectQueue() 装饰器,注入名为 'audio' 的队列实例
  constructor(@InjectQueue('audio') private readonly audioQueue: Queue) {}

  async transcodeAudio(fileData: any) {
    console.log('🎵 服务员接到了一个音频转码订单...');

    // 使用 .add() 方法将一个任务添加到队列中
    // 'transcode' 是这个任务的名称(或类型)
    // 第二个参数是任务所需的数据
    const job = await this.audioQueue.add('transcode', {
      file: fileData,
      format: 'mp3',
    });

    console.log(`✅ 订单已放入后厨队列,取餐号(Job ID)是: ${job.id}`);
    return { jobId: job.id };
  }
}

audioQueue.add() 这个调用会立即返回,几乎不耗时。它只是把订单信息发给了 Redis,然后就完事了。

3. 第三步:后厨开始做菜 (消费者 Consumer)

消费者,在 Bull 中被称为处理器 (Processor),是负责从队列中取出任务并执行它的后台工作者。

在 NestJS 中,我们通常会创建一个独立的、专门用于处理队列任务的类。

src/audio/audio.processor.ts

typescript
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';

// 1. 使用 @Processor() 装饰器,将这个类标记为 'audio' 队列的处理器
@Processor('audio')
export class AudioProcessor {
  private readonly logger = new Logger(AudioProcessor.name);

  // 2. 使用 @Process() 装饰器,定义处理名为 'transcode' 的任务的方法
  @Process('transcode')
  async handleTranscode(job: Job) {
    this.logger.debug(`👨‍🍳 后厨收到一个新任务!取餐号: ${job.id}`);
    this.logger.debug('开始转码音频...');
    this.logger.debug('处理的数据:', job.data);

    // 模拟一个非常耗时的转码过程
    await new Promise((resolve) => setTimeout(resolve, 10000));

    this.logger.debug(`✅ 转码完成!取餐号: ${job.id}`);
  }
}
  • @Processor('audio'): 将这个类绑定到 audio 队列。Bull 会自动监听这个队列。
  • @Process('transcode'): 当 audio 队列中出现一个名为 transcode 的新任务时,handleTranscode 方法就会被自动调用
  • job: Job: 参数 job 是一个包含了任务所有信息的对象,包括 job.idjob.data(我们在 add() 时传入的数据)。

最后,别忘了在 AudioModule 中注册这个 AudioProcessorAudioService

4. 第四步:监听后厨动态 (事件监听)

我们希望知道任务的进展,比如它什么时候开始、什么时候完成、或者是否失败了。Bull 提供了丰富的事件监听器。

src/audio/audio.processor.ts (添加事件监听)

typescript
@Processor('audio')
export class AudioProcessor {
  // ... handleTranscode 方法 ...

  @OnQueueActive()
  onActive(job: Job) {
    console.log(
      `🍳 厨师开始处理任务 ${job.id},数据: ${JSON.stringify(job.data)}`
    );
  }

  @OnQueueCompleted()
  onCompleted(job: Job, result: any) {
    console.log(`🎉 任务 ${job.id} 已完成!结果: ${JSON.stringify(result)}`);
  }

  @OnQueueFailed()
  onFailed(job: Job, err: Error) {
    console.error(`🔥 任务 ${job.id} 失败了!错误: ${err.message}`);
  }
}

这些装饰器让你能够以一种非常干净的方式,对队列任务的整个生命周期做出响应。

总结

@nestjs/bull 为你的应用引入了强大的异步处理能力,让你可以从容应对耗时任务。

  1. 安装配置 (BullModule): 设置好与 Redis 的连接,并注册你的队列。
  2. 生产任务 (@InjectQueue): 在你的服务中注入队列,并使用 queue.add(name, data) 来快速地派发任务。
  3. 消费任务 (@Processor, @Process): 创建一个专门的处理器类,使用装饰器来定义处理特定任务的方法。
  4. 监听事件 (@OnQueue...): 在处理器中监听任务的生命周期事件,以便进行日志记录、通知或其他后续操作。

掌握了队列,你就掌握了构建高可用、高吞吐量、用户体验绝佳的现代后端应用的核心秘诀。你的应用不再是一个手忙脚乱的小厨房,而是一个分工明确、流程自动化的高效“智能后厨”。