NestJS实时聊天应用开发实战教程

学习目标

  • 掌握使用NestJS开发实时聊天应用的方法
  • 理解WebSocket技术原理和应用
  • 学会实现用户认证、消息处理、房间管理等功能
  • 掌握Socket.IO的使用方法
  • 理解实时应用的性能优化和部署策略

核心概念

实时聊天系统架构设计

实时聊天系统通常包含以下核心模块:

  1. 用户模块:用户注册、登录、个人资料管理
  2. 消息模块:实时消息发送、接收、存储
  3. 房间模块:创建房间、加入房间、离开房间
  4. 认证模块:WebSocket连接认证、权限控制
  5. 通知模块:系统通知、用户状态更新
  6. 文件模块:图片、文件上传和分享

技术选型

  • 后端框架:NestJS
  • 数据库:PostgreSQL
  • ORM:TypeORM
  • 实时通信:Socket.IO
  • 认证:JWT
  • 文件存储:本地存储/云存储
  • 前端框架:React/Vue/Angular
  • 状态管理:Redux/Vuex

项目初始化

创建项目

# 创建NestJS项目
npm i -g @nestjs/cli
nest new real-time-chat

# 进入项目目录
cd real-time-chat

# 安装依赖
npm install @nestjs/typeorm typeorm pg @nestjs/jwt @nestjs/passport passport passport-jwt bcrypt @nestjs/config socket.io @nestjs/platform-socket.io @nestjs/websockets multer

配置数据库

创建 .env 文件:

# 数据库配置
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=postgres
DATABASE_PASSWORD=postgres
DATABASE_NAME=chat_app

# JWT配置
JWT_SECRET=your-secret-key
JWT_EXPIRES_IN=3600

# 文件上传配置
UPLOAD_DIR=./uploads

# Socket.IO配置
SOCKET_IO_PATH=/socket.io

核心模块实现

用户模块

用户实体

创建 src/users/entities/user.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  CreateDateColumn,
  UpdateDateColumn,
  OneToMany,
} from 'typeorm';
import { Message } from '../../messages/entities/message.entity';
import { Room } from '../../rooms/entities/room.entity';

export enum UserStatus {
  ONLINE = 'online',
  OFFLINE = 'offline',
  AWAY = 'away',
  BUSY = 'busy',
}

export enum UserRole {
  ADMIN = 'admin',
  USER = 'user',
}

@Entity('users')
export class User {
  @PrimaryGeneratedColumn()
  id: number;

  @Column({ unique: true })
  username: string;

  @Column({ unique: true })
  email: string;

  @Column()
  password: string;

  @Column({ default: UserRole.USER })
  role: UserRole;

  @Column({ default: UserStatus.OFFLINE })
  status: UserStatus;

  @Column({ nullable: true })
  avatar: string;

  @Column({ nullable: true })
  lastSeen: Date;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;

  @OneToMany(() => Message, (message) => message.sender)
  messages: Message[];

  @OneToMany(() => Room, (room) => room.creator)
  createdRooms: Room[];
}

用户服务

创建 src/users/users.service.ts

import {
  Injectable,
  ConflictException,
  UnauthorizedException,
} from '@nestjs/common';
import {
  InjectRepository,
} from '@nestjs/typeorm';
import {
  Repository,
} from 'typeorm';
import { User, UserStatus } from './entities/user.entity';
import * as bcrypt from 'bcrypt';
import { JwtService } from '@nestjs/jwt';

@Injectable()
export class UsersService {
  constructor(
    @InjectRepository(User)
    private usersRepository: Repository<User>,
    private jwtService: JwtService,
  ) {}

  async register(username: string, email: string, password: string) {
    // 检查用户是否存在
    const existingUser = await this.usersRepository.findOne({
      where: [{ username }, { email }],
    });

    if (existingUser) {
      throw new ConflictException('用户名或邮箱已存在');
    }

    // 密码加密
    const hashedPassword = await bcrypt.hash(password, 10);

    // 创建用户
    const user = this.usersRepository.create({
      username,
      email,
      password: hashedPassword,
    });

    return await this.usersRepository.save(user);
  }

  async login(email: string, password: string) {
    // 查找用户
    const user = await this.usersRepository.findOne({
      where: { email },
    });

    if (!user) {
      throw new UnauthorizedException('邮箱或密码错误');
    }

    // 验证密码
    const isPasswordValid = await bcrypt.compare(password, user.password);
    if (!isPasswordValid) {
      throw new UnauthorizedException('邮箱或密码错误');
    }

    // 更新用户状态为在线
    user.status = UserStatus.ONLINE;
    user.lastSeen = new Date();
    await this.usersRepository.save(user);

    // 生成JWT令牌
    const payload = { userId: user.id, role: user.role };
    const token = this.jwtService.sign(payload);

    return {
      access_token: token,
      user: {
        id: user.id,
        username: user.username,
        email: user.email,
        role: user.role,
        status: user.status,
        avatar: user.avatar,
        lastSeen: user.lastSeen,
      },
    };
  }

  async findOne(id: number) {
    return await this.usersRepository.findOne({
      where: { id },
    });
  }

  async findAll() {
    return await this.usersRepository.find({
      select: ['id', 'username', 'email', 'status', 'avatar', 'lastSeen'],
    });
  }

  async updateStatus(id: number, status: UserStatus) {
    return await this.usersRepository.update(id, {
      status,
      lastSeen: new Date(),
    });
  }

  async updateProfile(id: number, updateData: Partial<User>) {
    return await this.usersRepository.update(id, updateData);
  }
}

消息模块

消息实体

创建 src/messages/entities/message.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  CreateDateColumn,
  UpdateDateColumn,
  ManyToOne,
} from 'typeorm';
import { User } from '../../users/entities/user.entity';
import { Room } from '../../rooms/entities/room.entity';

export enum MessageType {
  TEXT = 'text',
  IMAGE = 'image',
  FILE = 'file',
  SYSTEM = 'system',
}

@Entity('messages')
export class Message {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  content: string;

  @Column({ default: MessageType.TEXT })
  type: MessageType;

  @Column({ nullable: true })
  fileUrl: string;

  @Column({ nullable: true })
  fileName: string;

  @Column({ default: false })
  isRead: boolean;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;

  @ManyToOne(() => User, (user) => user.messages)
  sender: User;

  @ManyToOne(() => Room, (room) => room.messages)
  room: Room;
}

消息服务

创建 src/messages/messages.service.ts

import {
  Injectable,
  NotFoundException,
} from '@nestjs/common';
import {
  InjectRepository,
} from '@nestjs/typeorm';
import {
  Repository,
}
 from 'typeorm';
import { Message, MessageType } from './entities/message.entity';
import { Room } from '../../rooms/entities/room.entity';

@Injectable()
export class MessagesService {
  constructor(
    @InjectRepository(Message)
    private messagesRepository: Repository<Message>,
    @InjectRepository(Room)
    private roomsRepository: Repository<Room>,
  ) {}

  // 创建消息
  async create(senderId: number, roomId: number, content: string, type: MessageType = MessageType.TEXT, fileUrl?: string, fileName?: string) {
    const room = await this.roomsRepository.findOne({
      where: { id: roomId },
    });

    if (!room) {
      throw new NotFoundException('房间不存在');
    }

    const message = this.messagesRepository.create({
      content,
      type,
      fileUrl,
      fileName,
      sender: { id: senderId },
      room: { id: roomId },
    });

    return await this.messagesRepository.save(message);
  }

  // 获取房间消息
  async getRoomMessages(roomId: number, limit: number = 50, offset: number = 0) {
    return await this.messagesRepository.find({
      where: { room: { id: roomId } },
      relations: ['sender'],
      order: { createdAt: 'DESC' },
      take: limit,
      skip: offset,
    });
  }

  // 标记消息为已读
  async markAsRead(messageId: number) {
    return await this.messagesRepository.update(messageId, {
      isRead: true,
    });
  }

  // 标记房间所有消息为已读
  async markRoomMessagesAsRead(roomId: number, userId: number) {
    return await this.messagesRepository.update(
      {
        room: { id: roomId },
        sender: { id: Not(userId) },
        isRead: false,
      },
      {
        isRead: true,
      },
    );
  }

  // 删除消息
  async remove(id: number) {
    return await this.messagesRepository.delete(id);
  }
}

房间模块

房间实体

创建 src/rooms/entities/room.entity.ts

import {
  Entity,
  PrimaryGeneratedColumn,
  Column,
  CreateDateColumn,
  UpdateDateColumn,
  ManyToOne,
  OneToMany,
  ManyToMany,
  JoinTable,
} from 'typeorm';
import { User } from '../../users/entities/user.entity';
import { Message } from '../../messages/entities/message.entity';

export enum RoomType {
  PUBLIC = 'public',
  PRIVATE = 'private',
  DIRECT = 'direct',
}

@Entity('rooms')
export class Room {
  @PrimaryGeneratedColumn()
  id: number;

  @Column()
  name: string;

  @Column({ nullable: true })
description: string;

  @Column({ default: RoomType.PUBLIC })
  type: RoomType;

  @Column({ nullable: true })
  password: string;

  @Column({ default: 0 })
  memberCount: number;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;

  @ManyToOne(() => User, (user) => user.createdRooms)
  creator: User;

  @OneToMany(() => Message, (message) => message.room)
  messages: Message[];

  @ManyToMany(() => User)
  @JoinTable()
  members: User[];
}

房间服务

创建 src/rooms/rooms.service.ts

import {
  Injectable,
  NotFoundException,
  ForbiddenException,
} from '@nestjs/common';
import {
  InjectRepository,
} from '@nestjs/typeorm';
import {
  Repository,
}
 from 'typeorm';
import { Room, RoomType } from './entities/room.entity';
import { User } from '../../users/entities/user.entity';
import * as bcrypt from 'bcrypt';

@Injectable()
export class RoomsService {
  constructor(
    @InjectRepository(Room)
    private roomsRepository: Repository<Room>,
    @InjectRepository(User)
    private usersRepository: Repository<User>,
  ) {}

  // 创建房间
  async create(creatorId: number, createRoomDto: any) {
    const { name, description, type, password } = createRoomDto;

    let hashedPassword = null;
    if (password) {
      hashedPassword = await bcrypt.hash(password, 10);
    }

    const room = this.roomsRepository.create({
      name,
      description,
      type,
      password: hashedPassword,
      creator: { id: creatorId },
      members: [{ id: creatorId }],
      memberCount: 1,
    });

    return await this.roomsRepository.save(room);
  }

  // 获取房间列表
  async findAll() {
    return await this.roomsRepository.find({
      relations: ['creator'],
      select: ['id', 'name', 'description', 'type', 'memberCount', 'createdAt', 'creator'],
    });
  }

  // 获取单个房间
  async findOne(id: number) {
    return await this.roomsRepository.findOne({
      where: { id },
      relations: ['creator', 'members'],
    });
  }

  // 加入房间
  async joinRoom(userId: number, roomId: number, password?: string) {
    const room = await this.roomsRepository.findOne({
      where: { id: roomId },
      relations: ['members'],
    });

    if (!room) {
      throw new NotFoundException('房间不存在');
    }

    // 检查密码
    if (room.password) {
      if (!password) {
        throw new ForbiddenException('需要密码');
      }
      const isPasswordValid = await bcrypt.compare(password, room.password);
      if (!isPasswordValid) {
        throw new ForbiddenException('密码错误');
      }
    }

    // 检查是否已在房间中
    const isMember = room.members.some(member => member.id === userId);
    if (isMember) {
      throw new ForbiddenException('已在房间中');
    }

    // 添加用户到房间
    const user = await this.usersRepository.findOne({
      where: { id: userId },
    });

    room.members.push(user);
    room.memberCount++;

    return await this.roomsRepository.save(room);
  }

  // 离开房间
  async leaveRoom(userId: number, roomId: number) {
    const room = await this.roomsRepository.findOne({
      where: { id: roomId },
      relations: ['members'],
    });

    if (!room) {
      throw new NotFoundException('房间不存在');
    }

    // 检查是否在房间中
    const isMember = room.members.some(member => member.id === userId);
    if (!isMember) {
      throw new ForbiddenException('不在房间中');
    }

    // 从房间中移除用户
    room.members = room.members.filter(member => member.id !== userId);
    room.memberCount--;

    return await this.roomsRepository.save(room);
  }

  // 删除房间
  async remove(id: number, userId: number) {
    const room = await this.roomsRepository.findOne({
      where: { id },
      relations: ['creator'],
    });

    if (!room) {
      throw new NotFoundException('房间不存在');
    }

    if (room.creator.id !== userId) {
      throw new ForbiddenException('只有创建者可以删除房间');
    }

    return await this.roomsRepository.delete(id);
  }

  // 创建私聊房间
  async createDirectRoom(userId1: number, userId2: number) {
    // 检查是否已存在私聊房间
    const existingRoom = await this.roomsRepository.findOne({
      where: {
        type: RoomType.DIRECT,
        members: {
          id: userId1,
        },
      },
      relations: ['members'],
    });

    if (existingRoom && existingRoom.members.some(member => member.id === userId2)) {
      return existingRoom;
    }

    // 获取用户信息
    const user1 = await this.usersRepository.findOne({ where: { id: userId1 } });
    const user2 = await this.usersRepository.findOne({ where: { id: userId2 } });

    // 创建私聊房间
    const room = this.roomsRepository.create({
      name: `${user1.username} & ${user2.username}`,
      type: RoomType.DIRECT,
      members: [user1, user2],
      memberCount: 2,
    });

    return await this.roomsRepository.save(room);
  }
}

WebSocket网关

主网关

创建 src/gateway/chat.gateway.ts

import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  OnGatewayConnection,
  OnGatewayDisconnect,
  MessageBody,
  ConnectedSocket,
} from '@nestjs/websockets';
import {
  Logger,
  UseGuards,
} from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { JwtService } from '@nestjs/jwt';
import { UsersService } from '../users/users.service';
import { MessagesService } from '../messages/messages.service';
import { RoomsService } from '../rooms/rooms.service';
import { UserStatus } from '../users/entities/user.entity';
import { MessageType } from '../messages/entities/message.entity';

@WebSocketGateway({
  cors: {
    origin: '*',
    methods: ['GET', 'POST'],
  },
  path: '/socket.io',
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private logger = new Logger('ChatGateway');
  private connectedUsers = new Map<number, string>(); // userId -> socketId

  constructor(
    private jwtService: JwtService,
    private usersService: UsersService,
    private messagesService: MessagesService,
    private roomsService: RoomsService,
  ) {}

  // 处理连接
  async handleConnection(client: Socket) {
    try {
      // 验证令牌
      const token = client.handshake.auth.token;
      if (!token) {
        client.disconnect();
        return;
      }

      const payload = this.jwtService.verify(token);
      const userId = payload.userId;

      // 保存用户连接
      this.connectedUsers.set(userId, client.id);
      client.data.userId = userId;

      // 更新用户状态
      await this.usersService.updateStatus(userId, UserStatus.ONLINE);

      // 广播用户上线
      this.server.emit('user:status', {
        userId,
        status: UserStatus.ONLINE,
      });

      this.logger.log(`用户 ${userId} 已连接`);
    } catch (error) {
      this.logger.error('连接验证失败', error);
      client.disconnect();
    }
  }

  // 处理断开连接
  async handleDisconnect(client: Socket) {
    try {
      const userId = client.data.userId;
      if (userId) {
        // 移除用户连接
        this.connectedUsers.delete(userId);

        // 更新用户状态
        await this.usersService.updateStatus(userId, UserStatus.OFFLINE);

        // 广播用户下线
        this.server.emit('user:status', {
          userId,
          status: UserStatus.OFFLINE,
          lastSeen: new Date(),
        });

        this.logger.log(`用户 ${userId} 已断开连接`);
      }
    } catch (error) {
      this.logger.error('断开连接处理失败', error);
    }
  }

  // 发送消息
  @SubscribeMessage('message:send')
  async handleMessage(
    @MessageBody() data: { roomId: number; content: string; type?: MessageType; fileUrl?: string; fileName?: string },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;
    
    // 创建消息
    const message = await this.messagesService.create(
      userId,
      data.roomId,
      data.content,
      data.type,
      data.fileUrl,
      data.fileName,
    );

    // 广播消息
    this.server.to(`room_${data.roomId}`).emit('message:new', message);

    return message;
  }

  // 加入房间
  @SubscribeMessage('room:join')
  async handleJoinRoom(
    @MessageBody() data: { roomId: number; password?: string },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 加入房间
    const room = await this.roomsService.joinRoom(userId, data.roomId, data.password);

    // 加入Socket.IO房间
    client.join(`room_${data.roomId}`);

    // 广播用户加入
    this.server.to(`room_${data.roomId}`).emit('room:user:join', {
      roomId: data.roomId,
      userId,
    });

    return room;
  }

  // 离开房间
  @SubscribeMessage('room:leave')
  async handleLeaveRoom(
    @MessageBody() data: { roomId: number },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 离开房间
    const room = await this.roomsService.leaveRoom(userId, data.roomId);

    // 离开Socket.IO房间
    client.leave(`room_${data.roomId}`);

    // 广播用户离开
    this.server.to(`room_${data.roomId}`).emit('room:user:leave', {
      roomId: data.roomId,
      userId,
    });

    return room;
  }

  // 创建房间
  @SubscribeMessage('room:create')
  async handleCreateRoom(
    @MessageBody() data: { name: string; description?: string; type?: string; password?: string },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 创建房间
    const room = await this.roomsService.create(userId, data);

    // 加入Socket.IO房间
    client.join(`room_${room.id}`);

    // 广播房间创建
    this.server.emit('room:created', room);

    return room;
  }

  // 标记消息为已读
  @SubscribeMessage('message:read')
  async handleMarkAsRead(
    @MessageBody() data: { messageId: number },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 标记消息为已读
    await this.messagesService.markAsRead(data.messageId);

    return { success: true };
  }

  // 标记房间消息为已读
  @SubscribeMessage('room:messages:read')
  async handleMarkRoomMessagesAsRead(
    @MessageBody() data: { roomId: number },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 标记房间消息为已读
    await this.messagesService.markRoomMessagesAsRead(data.roomId, userId);

    return { success: true };
  }

  // 更新用户状态
  @SubscribeMessage('user:status')
  async handleUpdateStatus(
    @MessageBody() data: { status: UserStatus },
    @ConnectedSocket() client: Socket,
  ) {
    const userId = client.data.userId;

    // 更新用户状态
    await this.usersService.updateStatus(userId, data.status);

    // 广播状态更新
    this.server.emit('user:status', {
      userId,
      status: data.status,
    });

    return { success: true };
  }
}

API控制器

用户控制器

创建 src/users/users.controller.ts

import {
  Controller,
  Post,
  Get,
  Body,
  UseGuards,
  Request,
  Patch,
} from '@nestjs/common';
import { UsersService } from './users.service';
import { AuthGuard } from '@nestjs/passport';

@Controller('users')
export class UsersController {
  constructor(private usersService: UsersService) {}

  // 注册
  @Post('register')
  async register(@Body() body: { username: string; email: string; password: string }) {
    return await this.usersService.register(body.username, body.email, body.password);
  }

  // 登录
  @Post('login')
  async login(@Body() body: { email: string; password: string }) {
    return await this.usersService.login(body.email, body.password);
  }

  // 获取个人资料
  @UseGuards(AuthGuard('jwt'))
  @Get('profile')
  async getProfile(@Request() req) {
    return await this.usersService.findOne(req.user.userId);
  }

  // 获取所有用户
  @UseGuards(AuthGuard('jwt'))
  @Get()
  async getAllUsers() {
    return await this.usersService.findAll();
  }

  // 更新个人资料
  @UseGuards(AuthGuard('jwt'))
  @Patch('profile')
  async updateProfile(@Request() req, @Body() body) {
    return await this.usersService.updateProfile(req.user.userId, body);
  }
}

消息控制器

创建 src/messages/messages.controller.ts

import {
  Controller,
  Get,
  Post,
  Patch,
  Delete,
  Param,
  Query,
  Body,
  UseGuards,
  Request,
} from '@nestjs/common';
import { MessagesService } from './messages.service';
import { AuthGuard } from '@nestjs/passport';

@Controller('messages')
export class MessagesController {
  constructor(private messagesService: MessagesService) {}

  // 获取房间消息
  @UseGuards(AuthGuard('jwt'))
  @Get('room/:roomId')
  async getRoomMessages(@Param('roomId') roomId: number, @Query() query: { limit?: number; offset?: number }) {
    return await this.messagesService.getRoomMessages(
      roomId,
      query.limit,
      query.offset,
    );
  }

  // 标记消息为已读
  @UseGuards(AuthGuard('jwt'))
  @Patch(':id/read')
  async markAsRead(@Param('id') id: number) {
    return await this.messagesService.markAsRead(id);
  }

  // 标记房间消息为已读
  @UseGuards(AuthGuard('jwt'))
  @Patch('room/:roomId/read')
  async markRoomMessagesAsRead(@Param('roomId') roomId: number, @Request() req) {
    return await this.messagesService.markRoomMessagesAsRead(roomId, req.user.userId);
  }

  // 删除消息
  @UseGuards(AuthGuard('jwt'))
  @Delete(':id')
  async remove(@Param('id') id: number) {
    return await this.messagesService.remove(id);
  }
}

房间控制器

创建 src/rooms/rooms.controller.ts

import {
  Controller,
  Post,
  Get,
  Put,
  Delete,
  Body,
  Param,
  UseGuards,
  Request,
} from '@nestjs/common';
import { RoomsService } from './rooms.service';
import { AuthGuard } from '@nestjs/passport';

@Controller('rooms')
export class RoomsController {
  constructor(private roomsService: RoomsService) {}

  // 创建房间
  @UseGuards(AuthGuard('jwt'))
  @Post()
  async create(@Request() req, @Body() body) {
    return await this.roomsService.create(req.user.userId, body);
  }

  // 获取房间列表
  @UseGuards(AuthGuard('jwt'))
  @Get()
  async findAll() {
    return await this.roomsService.findAll();
  }

  // 获取单个房间
  @UseGuards(AuthGuard('jwt'))
  @Get(':id')
  async findOne(@Param('id') id: number) {
    return await this.roomsService.findOne(id);
  }

  // 加入房间
  @UseGuards(AuthGuard('jwt'))
  @Post(':id/join')
  async joinRoom(@Request() req, @Param('id') id: number, @Body() body: { password?: string }) {
    return await this.roomsService.joinRoom(req.user.userId, id, body.password);
  }

  // 离开房间
  @UseGuards(AuthGuard('jwt'))
  @Post(':id/leave')
  async leaveRoom(@Request() req, @Param('id') id: number) {
    return await this.roomsService.leaveRoom(req.user.userId, id);
  }

  // 删除房间
  @UseGuards(AuthGuard('jwt'))
  @Delete(':id')
  async remove(@Request() req, @Param('id') id: number) {
    return await this.roomsService.remove(id, req.user.userId);
  }

  // 创建私聊房间
  @UseGuards(AuthGuard('jwt'))
  @Post('direct/:userId')
  async createDirectRoom(@Request() req, @Param('userId') userId: number) {
    return await this.roomsService.createDirectRoom(req.user.userId, userId);
  }
}

文件控制器

创建 src/files/files.controller.ts

import {
  Controller,
  Post,
  UseInterceptors,
  UploadedFile,
  UseGuards,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { AuthGuard } from '@nestjs/passport';
import * as fs from 'fs';
import * as path from 'path';
import { v4 as uuidv4 } from 'uuid';

@Controller('files')
export class FilesController {
  private uploadDir = './uploads';

  constructor() {
    // 确保上传目录存在
    if (!fs.existsSync(this.uploadDir)) {
      fs.mkdirSync(this.uploadDir, { recursive: true });
    }
  }

  // 上传文件
  @UseGuards(AuthGuard('jwt'))
  @Post('upload')
  @UseInterceptors(FileInterceptor('file'))
  async uploadFile(@UploadedFile() file: Express.Multer.File) {
    if (!file) {
      return { error: '请选择文件' };
    }

    // 生成唯一文件名
    const fileName = `${uuidv4()}${path.extname(file.originalname)}`;
    const filePath = path.join(this.uploadDir, fileName);

    // 保存文件
    fs.writeFileSync(filePath, file.buffer);

    return {
      filename: fileName,
      path: `/uploads/${fileName}`,
      url: `http://localhost:3000/uploads/${fileName}`,
    };
  }
}

主模块配置

创建 src/app.module.ts

import {
  Module,
} from '@nestjs/common';
import {
  TypeOrmModule,
} from '@nestjs/typeorm';
import {
  ConfigModule,
  ConfigService,
} from '@nestjs/config';
import { UsersModule } from './users/users.module';
import { MessagesModule } from './messages/messages.module';
import { RoomsModule } from './rooms/rooms.module';
import { FilesModule } from './files/files.module';
import { AuthModule } from './auth/auth.module';
import { ChatGateway } from './gateway/chat.gateway';
import { User } from './users/entities/user.entity';
import { Message } from './messages/entities/message.entity';
import { Room } from './rooms/entities/room.entity';
import { JwtModule } from '@nestjs/jwt';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    TypeOrmModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        type: 'postgres',
        host: configService.get('DATABASE_HOST'),
        port: parseInt(configService.get('DATABASE_PORT')),
        username: configService.get('DATABASE_USERNAME'),
        password: configService.get('DATABASE_PASSWORD'),
        database: configService.get('DATABASE_NAME'),
        entities: [User, Message, Room],
        synchronize: true,
      }),
      inject: [ConfigService],
    }),
    JwtModule.registerAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        secret: configService.get('JWT_SECRET'),
        signOptions: { expiresIn: configService.get('JWT_EXPIRES_IN') },
      }),
      inject: [ConfigService],
    }),
    UsersModule,
    MessagesModule,
    RoomsModule,
    FilesModule,
    AuthModule,
  ],
  providers: [ChatGateway],
})
export class AppModule {}  

前端集成

前端项目初始化

# 创建React项目
npm create vite@latest chat-frontend -- --template react

# 进入项目目录
cd chat-frontend

# 安装依赖
npm install socket.io-client axios @reduxjs/toolkit react-redux

Socket.IO连接

创建 src/socket.js

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000', {
  autoConnect: false,
  path: '/socket.io',
});

export const connectSocket = (token) => {
  socket.auth = { token };
  socket.connect();
};

export const disconnectSocket = () => {
  socket.disconnect();
};

export default socket;

消息组件

创建 src/components/ChatRoom.jsx

import React, { useState, useEffect, useRef } from 'react';
import socket from '../socket';

const ChatRoom = ({ roomId, userId }) => {
  const [messages, setMessages] = useState([]);
  const [input, setInput] = useState('');
  const messagesEndRef = useRef(null);

  useEffect(() => {
    // 加入房间
    socket.emit('room:join', { roomId });

    // 监听新消息
    socket.on('message:new', (message) => {
      setMessages(prev => [...prev, message]);
    });

    // 监听用户加入
    socket.on('room:user:join', (data) => {
      console.log('用户加入:', data);
    });

    // 监听用户离开
    socket.on('room:user:leave', (data) => {
      console.log('用户离开:', data);
    });

    return () => {
      // 离开房间
      socket.emit('room:leave', { roomId });
      socket.off('message:new');
      socket.off('room:user:join');
      socket.off('room:user:leave');
    };
  }, [roomId]);

  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  };

  useEffect(() => {
    scrollToBottom();
  }, [messages]);

  const handleSend = () => {
    if (input.trim()) {
      socket.emit('message:send', {
        roomId,
        content: input,
      });
      setInput('');
    }
  };

  return (
    <div className="chat-room">
      <div className="messages">
        {messages.map((message) => (
          <div key={message.id} className={`message ${message.sender.id === userId ? 'own' : ''}`}>
            <div className="message-content">{message.content}</div>
            <div className="message-sender">{message.sender.username}</div>
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>
      <div className="input-area">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyPress={(e) => e.key === 'Enter' && handleSend()}
          placeholder="输入消息..."
        />
        <button onClick={handleSend}>发送</button>
      </div>
    </div>
  );
};

export default ChatRoom;

登录组件

创建 src/components/Login.jsx

import React, { useState } from 'react';
import axios from 'axios';
import { connectSocket } from '../socket';

const Login = ({ onLogin }) => {
  const [email, setEmail] = useState('');
  const [password, setPassword] = useState('');
  const [error, setError] = useState('');

  const handleSubmit = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:3000/users/login', {
        email,
        password,
      });

      const { access_token, user } = response.data;
      
      // 连接Socket.IO
      connectSocket(access_token);
      
      // 存储令牌
      localStorage.setItem('token', access_token);
      localStorage.setItem('user', JSON.stringify(user));
      
      // 回调
      onLogin(user);
    } catch (err) {
      setError(err.response?.data?.message || '登录失败');
    }
  };

  return (
    <div className="login">
      <h2>登录</h2>
      {error && <div className="error">{error}</div>}
      <form onSubmit={handleSubmit}>
        <div>
          <label>邮箱</label>
          <input
            type="email"
            value={email}
            onChange={(e) => setEmail(e.target.value)}
            required
          />
        </div>
        <div>
          <label>密码</label>
          <input
            type="password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
        </div>
        <button type="submit">登录</button>
      </form>
    </div>
  );
};

export default Login;

运行项目

启动后端服务器

# 启动开发服务器
npm run start:dev

启动前端开发服务器

# 进入前端目录
cd chat-frontend

# 启动开发服务器
npm run dev

测试API

使用Postman或其他API测试工具测试以下API:

  1. 用户接口

    • POST /users/register - 注册
    • POST /users/login - 登录
    • GET /users/profile - 获取个人资料
    • GET /users - 获取所有用户
    • PATCH /users/profile - 更新个人资料
  2. 房间接口

    • POST /rooms - 创建房间
    • GET /rooms - 获取房间列表
    • GET /rooms/:id - 获取单个房间
    • POST /rooms/:id/join - 加入房间
    • POST /rooms/:id/leave - 离开房间
    • DELETE /rooms/:id - 删除房间
    • POST /rooms/direct/:userId - 创建私聊房间
  3. 消息接口

    • GET /messages/room/:roomId - 获取房间消息
    • PATCH /messages/:id/read - 标记消息为已读
    • PATCH /messages/room/:roomId/read - 标记房间消息为已读
    • DELETE /messages/:id - 删除消息
  4. 文件接口

    • POST /files/upload - 上传文件

部署上线

构建项目

# 构建后端
npm run build

# 构建前端
cd chat-frontend
npm run build

部署到服务器

  1. 使用PM2管理进程
# 安装PM2
npm i -g pm2

# 启动后端
pm run start:prod

# 或者使用PM2
pm run build
pm run start:prod
  1. 使用Docker部署

创建 Dockerfile

FROM node:16-alpine

WORKDIR /app

COPY package*.json ./
RUN npm install

COPY . .

RUN npm run build

EXPOSE 3000

CMD ["npm", "run", "start:prod"]

创建 docker-compose.yml

version: '3'
services:
  app:
    build: .
    ports:
      - '3000:3000'
    depends_on:
      - db
    environment:
      - DATABASE_HOST=db
      - DATABASE_PORT=5432
      - DATABASE_USERNAME=postgres
      - DATABASE_PASSWORD=postgres
      - DATABASE_NAME=chat_app

  db:
    image: postgres:13
    ports:
      - '5432:5432'
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=chat_app
    volumes:
      - postgres-data:/var/lib/postgresql/data

volumes:
  postgres-data:

启动Docker容器:

docker-compose up -d

性能优化

后端优化

  1. 使用Redis缓存
# 安装Redis依赖
npm install redis
  1. 消息批处理
// 消息批处理
private messageQueue = new Map<string, Message[]>();

async processMessageQueue() {
  for (const [roomId, messages] of this.messageQueue) {
    // 批量保存消息
    await this.messagesRepository.save(messages);
    
    // 广播消息
    for (const message of messages) {
      this.server.to(`room_${roomId}`).emit('message:new', message);
    }
    
    this.messageQueue.delete(roomId);
  }
}
  1. 连接池管理
// 连接池配置
const pool = new Pool({
  max: 20,
  min: 5,
  idle: 10000,
});

前端优化

  1. 消息分页加载
const loadMoreMessages = async () => {
  const response = await axios.get(`/messages/room/${roomId}`, {
    params: {
      limit: 50,
      offset: messages.length,
    },
  });
  setMessages(prev => [...response.data, ...prev]);
};
  1. WebSocket重连
// 重连机制
socket.on('disconnect', () => {
  setTimeout(() => {
    connectSocket(localStorage.getItem('token'));
  }, 5000);
});
  1. 消息压缩
// 启用消息压缩
socket.compress(true).emit('message:send', data);

安全措施

  1. WebSocket认证:使用JWT验证WebSocket连接
  2. 消息加密:对敏感消息进行加密传输
  3. 输入验证:验证所有用户输入
  4. SQL注入防护:使用TypeORM的参数化查询
  5. XSS防护:对用户输入进行转义
  6. CSRF防护:实现CSRF令牌验证
  7. 密码安全:使用bcrypt加密存储密码
  8. API限流:实现请求频率限制
  9. HTTPS:在生产环境使用HTTPS
  10. 定期安全审计:定期检查系统安全漏洞

总结

本教程详细讲解了如何使用NestJS和WebSocket技术开发一个完整的实时聊天应用,包括用户认证、消息处理、房间管理等核心功能。通过本教程的学习,你应该能够:

  1. 理解实时聊天系统的整体架构设计
  2. 掌握NestJS框架的核心功能和使用方法
  3. 学会使用Socket.IO实现实时通信
  4. 实现用户认证、消息处理、房间管理等功能
  5. 了解前端集成和部署策略
  6. 掌握实时应用的性能优化和安全措施

互动问答

问题1:如何实现WebSocket连接认证?

答案:在WebSocket连接时通过handshake.auth传递JWT令牌,在服务器端验证令牌的有效性,只有验证通过的连接才被允许。

问题2:如何实现私聊功能?

答案:创建类型为direct的房间,自动将两个用户添加到房间中,实现一对一的私聊功能。

问题3:如何优化实时聊天应用的性能?

答案:可以通过使用Redis缓存、消息批处理、连接池管理、消息分页加载、WebSocket重连、消息压缩等方式提高系统性能。

问题4:如何确保实时聊天应用的安全性?

答案:可以通过WebSocket认证、消息加密、输入验证、SQL注入防护、XSS防护、CSRF防护、密码安全、API限流、HTTPS、定期安全审计等措施确保系统安全。

问题5:如何实现文件上传和分享功能?

答案:创建文件上传接口,使用multer处理文件上传,将文件保存到本地或云存储,然后通过消息发送文件URL实现文件分享。

实践作业

  1. 实现消息撤回功能

    • 添加消息撤回API
    • 实现前端撤回按钮
    • 广播撤回通知
  2. 实现消息编辑功能

    • 添加消息编辑API
    • 实现前端编辑界面
    • 广播编辑通知
  3. 实现用户在线状态显示

    • 实时更新用户状态
    • 在前端显示在线状态
    • 实现状态变更通知
  4. 实现消息已读回执

    • 标记消息已读状态
    • 显示已读用户列表
    • 实现已读回执通知
  5. 实现多语言支持

    • 添加国际化配置
    • 实现语言切换功能
    • 支持多语言消息

通过完成这些作业,你将能够进一步巩固所学知识,开发出功能更加完善的实时聊天应用。

« 上一篇 NestJS电子商务API开发实战教程 下一篇 » NestJS GraphQL实战教程