NestJS实时聊天应用开发实战教程
学习目标
- 掌握使用NestJS开发实时聊天应用的方法
- 理解WebSocket技术原理和应用
- 学会实现用户认证、消息处理、房间管理等功能
- 掌握Socket.IO的使用方法
- 理解实时应用的性能优化和部署策略
核心概念
实时聊天系统架构设计
实时聊天系统通常包含以下核心模块:
- 用户模块:用户注册、登录、个人资料管理
- 消息模块:实时消息发送、接收、存储
- 房间模块:创建房间、加入房间、离开房间
- 认证模块:WebSocket连接认证、权限控制
- 通知模块:系统通知、用户状态更新
- 文件模块:图片、文件上传和分享
技术选型
- 后端框架:NestJS
- 数据库:PostgreSQL
- ORM:TypeORM
- 实时通信:Socket.IO
- 认证:JWT
- 文件存储:本地存储/云存储
- 前端框架:React/Vue/Angular
- 状态管理:Redux/Vuex
项目初始化
创建项目
# 创建NestJS项目
npm i -g @nestjs/cli
nest new real-time-chat
# 进入项目目录
cd real-time-chat
# 安装依赖
npm install @nestjs/typeorm typeorm pg @nestjs/jwt @nestjs/passport passport passport-jwt bcrypt @nestjs/config socket.io @nestjs/platform-socket.io @nestjs/websockets multer配置数据库
创建 .env 文件:
# 数据库配置
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=postgres
DATABASE_PASSWORD=postgres
DATABASE_NAME=chat_app
# JWT配置
JWT_SECRET=your-secret-key
JWT_EXPIRES_IN=3600
# 文件上传配置
UPLOAD_DIR=./uploads
# Socket.IO配置
SOCKET_IO_PATH=/socket.io核心模块实现
用户模块
用户实体
创建 src/users/entities/user.entity.ts:
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
UpdateDateColumn,
OneToMany,
} from 'typeorm';
import { Message } from '../../messages/entities/message.entity';
import { Room } from '../../rooms/entities/room.entity';
export enum UserStatus {
ONLINE = 'online',
OFFLINE = 'offline',
AWAY = 'away',
BUSY = 'busy',
}
export enum UserRole {
ADMIN = 'admin',
USER = 'user',
}
@Entity('users')
export class User {
@PrimaryGeneratedColumn()
id: number;
@Column({ unique: true })
username: string;
@Column({ unique: true })
email: string;
@Column()
password: string;
@Column({ default: UserRole.USER })
role: UserRole;
@Column({ default: UserStatus.OFFLINE })
status: UserStatus;
@Column({ nullable: true })
avatar: string;
@Column({ nullable: true })
lastSeen: Date;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@OneToMany(() => Message, (message) => message.sender)
messages: Message[];
@OneToMany(() => Room, (room) => room.creator)
createdRooms: Room[];
}用户服务
创建 src/users/users.service.ts:
import {
Injectable,
ConflictException,
UnauthorizedException,
} from '@nestjs/common';
import {
InjectRepository,
} from '@nestjs/typeorm';
import {
Repository,
} from 'typeorm';
import { User, UserStatus } from './entities/user.entity';
import * as bcrypt from 'bcrypt';
import { JwtService } from '@nestjs/jwt';
@Injectable()
export class UsersService {
constructor(
@InjectRepository(User)
private usersRepository: Repository<User>,
private jwtService: JwtService,
) {}
async register(username: string, email: string, password: string) {
// 检查用户是否存在
const existingUser = await this.usersRepository.findOne({
where: [{ username }, { email }],
});
if (existingUser) {
throw new ConflictException('用户名或邮箱已存在');
}
// 密码加密
const hashedPassword = await bcrypt.hash(password, 10);
// 创建用户
const user = this.usersRepository.create({
username,
email,
password: hashedPassword,
});
return await this.usersRepository.save(user);
}
async login(email: string, password: string) {
// 查找用户
const user = await this.usersRepository.findOne({
where: { email },
});
if (!user) {
throw new UnauthorizedException('邮箱或密码错误');
}
// 验证密码
const isPasswordValid = await bcrypt.compare(password, user.password);
if (!isPasswordValid) {
throw new UnauthorizedException('邮箱或密码错误');
}
// 更新用户状态为在线
user.status = UserStatus.ONLINE;
user.lastSeen = new Date();
await this.usersRepository.save(user);
// 生成JWT令牌
const payload = { userId: user.id, role: user.role };
const token = this.jwtService.sign(payload);
return {
access_token: token,
user: {
id: user.id,
username: user.username,
email: user.email,
role: user.role,
status: user.status,
avatar: user.avatar,
lastSeen: user.lastSeen,
},
};
}
async findOne(id: number) {
return await this.usersRepository.findOne({
where: { id },
});
}
async findAll() {
return await this.usersRepository.find({
select: ['id', 'username', 'email', 'status', 'avatar', 'lastSeen'],
});
}
async updateStatus(id: number, status: UserStatus) {
return await this.usersRepository.update(id, {
status,
lastSeen: new Date(),
});
}
async updateProfile(id: number, updateData: Partial<User>) {
return await this.usersRepository.update(id, updateData);
}
}消息模块
消息实体
创建 src/messages/entities/message.entity.ts:
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
UpdateDateColumn,
ManyToOne,
} from 'typeorm';
import { User } from '../../users/entities/user.entity';
import { Room } from '../../rooms/entities/room.entity';
export enum MessageType {
TEXT = 'text',
IMAGE = 'image',
FILE = 'file',
SYSTEM = 'system',
}
@Entity('messages')
export class Message {
@PrimaryGeneratedColumn()
id: number;
@Column()
content: string;
@Column({ default: MessageType.TEXT })
type: MessageType;
@Column({ nullable: true })
fileUrl: string;
@Column({ nullable: true })
fileName: string;
@Column({ default: false })
isRead: boolean;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@ManyToOne(() => User, (user) => user.messages)
sender: User;
@ManyToOne(() => Room, (room) => room.messages)
room: Room;
}消息服务
创建 src/messages/messages.service.ts:
import {
Injectable,
NotFoundException,
} from '@nestjs/common';
import {
InjectRepository,
} from '@nestjs/typeorm';
import {
Repository,
}
from 'typeorm';
import { Message, MessageType } from './entities/message.entity';
import { Room } from '../../rooms/entities/room.entity';
@Injectable()
export class MessagesService {
constructor(
@InjectRepository(Message)
private messagesRepository: Repository<Message>,
@InjectRepository(Room)
private roomsRepository: Repository<Room>,
) {}
// 创建消息
async create(senderId: number, roomId: number, content: string, type: MessageType = MessageType.TEXT, fileUrl?: string, fileName?: string) {
const room = await this.roomsRepository.findOne({
where: { id: roomId },
});
if (!room) {
throw new NotFoundException('房间不存在');
}
const message = this.messagesRepository.create({
content,
type,
fileUrl,
fileName,
sender: { id: senderId },
room: { id: roomId },
});
return await this.messagesRepository.save(message);
}
// 获取房间消息
async getRoomMessages(roomId: number, limit: number = 50, offset: number = 0) {
return await this.messagesRepository.find({
where: { room: { id: roomId } },
relations: ['sender'],
order: { createdAt: 'DESC' },
take: limit,
skip: offset,
});
}
// 标记消息为已读
async markAsRead(messageId: number) {
return await this.messagesRepository.update(messageId, {
isRead: true,
});
}
// 标记房间所有消息为已读
async markRoomMessagesAsRead(roomId: number, userId: number) {
return await this.messagesRepository.update(
{
room: { id: roomId },
sender: { id: Not(userId) },
isRead: false,
},
{
isRead: true,
},
);
}
// 删除消息
async remove(id: number) {
return await this.messagesRepository.delete(id);
}
}房间模块
房间实体
创建 src/rooms/entities/room.entity.ts:
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
UpdateDateColumn,
ManyToOne,
OneToMany,
ManyToMany,
JoinTable,
} from 'typeorm';
import { User } from '../../users/entities/user.entity';
import { Message } from '../../messages/entities/message.entity';
export enum RoomType {
PUBLIC = 'public',
PRIVATE = 'private',
DIRECT = 'direct',
}
@Entity('rooms')
export class Room {
@PrimaryGeneratedColumn()
id: number;
@Column()
name: string;
@Column({ nullable: true })
description: string;
@Column({ default: RoomType.PUBLIC })
type: RoomType;
@Column({ nullable: true })
password: string;
@Column({ default: 0 })
memberCount: number;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@ManyToOne(() => User, (user) => user.createdRooms)
creator: User;
@OneToMany(() => Message, (message) => message.room)
messages: Message[];
@ManyToMany(() => User)
@JoinTable()
members: User[];
}房间服务
创建 src/rooms/rooms.service.ts:
import {
Injectable,
NotFoundException,
ForbiddenException,
} from '@nestjs/common';
import {
InjectRepository,
} from '@nestjs/typeorm';
import {
Repository,
}
from 'typeorm';
import { Room, RoomType } from './entities/room.entity';
import { User } from '../../users/entities/user.entity';
import * as bcrypt from 'bcrypt';
@Injectable()
export class RoomsService {
constructor(
@InjectRepository(Room)
private roomsRepository: Repository<Room>,
@InjectRepository(User)
private usersRepository: Repository<User>,
) {}
// 创建房间
async create(creatorId: number, createRoomDto: any) {
const { name, description, type, password } = createRoomDto;
let hashedPassword = null;
if (password) {
hashedPassword = await bcrypt.hash(password, 10);
}
const room = this.roomsRepository.create({
name,
description,
type,
password: hashedPassword,
creator: { id: creatorId },
members: [{ id: creatorId }],
memberCount: 1,
});
return await this.roomsRepository.save(room);
}
// 获取房间列表
async findAll() {
return await this.roomsRepository.find({
relations: ['creator'],
select: ['id', 'name', 'description', 'type', 'memberCount', 'createdAt', 'creator'],
});
}
// 获取单个房间
async findOne(id: number) {
return await this.roomsRepository.findOne({
where: { id },
relations: ['creator', 'members'],
});
}
// 加入房间
async joinRoom(userId: number, roomId: number, password?: string) {
const room = await this.roomsRepository.findOne({
where: { id: roomId },
relations: ['members'],
});
if (!room) {
throw new NotFoundException('房间不存在');
}
// 检查密码
if (room.password) {
if (!password) {
throw new ForbiddenException('需要密码');
}
const isPasswordValid = await bcrypt.compare(password, room.password);
if (!isPasswordValid) {
throw new ForbiddenException('密码错误');
}
}
// 检查是否已在房间中
const isMember = room.members.some(member => member.id === userId);
if (isMember) {
throw new ForbiddenException('已在房间中');
}
// 添加用户到房间
const user = await this.usersRepository.findOne({
where: { id: userId },
});
room.members.push(user);
room.memberCount++;
return await this.roomsRepository.save(room);
}
// 离开房间
async leaveRoom(userId: number, roomId: number) {
const room = await this.roomsRepository.findOne({
where: { id: roomId },
relations: ['members'],
});
if (!room) {
throw new NotFoundException('房间不存在');
}
// 检查是否在房间中
const isMember = room.members.some(member => member.id === userId);
if (!isMember) {
throw new ForbiddenException('不在房间中');
}
// 从房间中移除用户
room.members = room.members.filter(member => member.id !== userId);
room.memberCount--;
return await this.roomsRepository.save(room);
}
// 删除房间
async remove(id: number, userId: number) {
const room = await this.roomsRepository.findOne({
where: { id },
relations: ['creator'],
});
if (!room) {
throw new NotFoundException('房间不存在');
}
if (room.creator.id !== userId) {
throw new ForbiddenException('只有创建者可以删除房间');
}
return await this.roomsRepository.delete(id);
}
// 创建私聊房间
async createDirectRoom(userId1: number, userId2: number) {
// 检查是否已存在私聊房间
const existingRoom = await this.roomsRepository.findOne({
where: {
type: RoomType.DIRECT,
members: {
id: userId1,
},
},
relations: ['members'],
});
if (existingRoom && existingRoom.members.some(member => member.id === userId2)) {
return existingRoom;
}
// 获取用户信息
const user1 = await this.usersRepository.findOne({ where: { id: userId1 } });
const user2 = await this.usersRepository.findOne({ where: { id: userId2 } });
// 创建私聊房间
const room = this.roomsRepository.create({
name: `${user1.username} & ${user2.username}`,
type: RoomType.DIRECT,
members: [user1, user2],
memberCount: 2,
});
return await this.roomsRepository.save(room);
}
}WebSocket网关
主网关
创建 src/gateway/chat.gateway.ts:
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
MessageBody,
ConnectedSocket,
} from '@nestjs/websockets';
import {
Logger,
UseGuards,
} from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { JwtService } from '@nestjs/jwt';
import { UsersService } from '../users/users.service';
import { MessagesService } from '../messages/messages.service';
import { RoomsService } from '../rooms/rooms.service';
import { UserStatus } from '../users/entities/user.entity';
import { MessageType } from '../messages/entities/message.entity';
@WebSocketGateway({
cors: {
origin: '*',
methods: ['GET', 'POST'],
},
path: '/socket.io',
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private logger = new Logger('ChatGateway');
private connectedUsers = new Map<number, string>(); // userId -> socketId
constructor(
private jwtService: JwtService,
private usersService: UsersService,
private messagesService: MessagesService,
private roomsService: RoomsService,
) {}
// 处理连接
async handleConnection(client: Socket) {
try {
// 验证令牌
const token = client.handshake.auth.token;
if (!token) {
client.disconnect();
return;
}
const payload = this.jwtService.verify(token);
const userId = payload.userId;
// 保存用户连接
this.connectedUsers.set(userId, client.id);
client.data.userId = userId;
// 更新用户状态
await this.usersService.updateStatus(userId, UserStatus.ONLINE);
// 广播用户上线
this.server.emit('user:status', {
userId,
status: UserStatus.ONLINE,
});
this.logger.log(`用户 ${userId} 已连接`);
} catch (error) {
this.logger.error('连接验证失败', error);
client.disconnect();
}
}
// 处理断开连接
async handleDisconnect(client: Socket) {
try {
const userId = client.data.userId;
if (userId) {
// 移除用户连接
this.connectedUsers.delete(userId);
// 更新用户状态
await this.usersService.updateStatus(userId, UserStatus.OFFLINE);
// 广播用户下线
this.server.emit('user:status', {
userId,
status: UserStatus.OFFLINE,
lastSeen: new Date(),
});
this.logger.log(`用户 ${userId} 已断开连接`);
}
} catch (error) {
this.logger.error('断开连接处理失败', error);
}
}
// 发送消息
@SubscribeMessage('message:send')
async handleMessage(
@MessageBody() data: { roomId: number; content: string; type?: MessageType; fileUrl?: string; fileName?: string },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 创建消息
const message = await this.messagesService.create(
userId,
data.roomId,
data.content,
data.type,
data.fileUrl,
data.fileName,
);
// 广播消息
this.server.to(`room_${data.roomId}`).emit('message:new', message);
return message;
}
// 加入房间
@SubscribeMessage('room:join')
async handleJoinRoom(
@MessageBody() data: { roomId: number; password?: string },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 加入房间
const room = await this.roomsService.joinRoom(userId, data.roomId, data.password);
// 加入Socket.IO房间
client.join(`room_${data.roomId}`);
// 广播用户加入
this.server.to(`room_${data.roomId}`).emit('room:user:join', {
roomId: data.roomId,
userId,
});
return room;
}
// 离开房间
@SubscribeMessage('room:leave')
async handleLeaveRoom(
@MessageBody() data: { roomId: number },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 离开房间
const room = await this.roomsService.leaveRoom(userId, data.roomId);
// 离开Socket.IO房间
client.leave(`room_${data.roomId}`);
// 广播用户离开
this.server.to(`room_${data.roomId}`).emit('room:user:leave', {
roomId: data.roomId,
userId,
});
return room;
}
// 创建房间
@SubscribeMessage('room:create')
async handleCreateRoom(
@MessageBody() data: { name: string; description?: string; type?: string; password?: string },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 创建房间
const room = await this.roomsService.create(userId, data);
// 加入Socket.IO房间
client.join(`room_${room.id}`);
// 广播房间创建
this.server.emit('room:created', room);
return room;
}
// 标记消息为已读
@SubscribeMessage('message:read')
async handleMarkAsRead(
@MessageBody() data: { messageId: number },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 标记消息为已读
await this.messagesService.markAsRead(data.messageId);
return { success: true };
}
// 标记房间消息为已读
@SubscribeMessage('room:messages:read')
async handleMarkRoomMessagesAsRead(
@MessageBody() data: { roomId: number },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 标记房间消息为已读
await this.messagesService.markRoomMessagesAsRead(data.roomId, userId);
return { success: true };
}
// 更新用户状态
@SubscribeMessage('user:status')
async handleUpdateStatus(
@MessageBody() data: { status: UserStatus },
@ConnectedSocket() client: Socket,
) {
const userId = client.data.userId;
// 更新用户状态
await this.usersService.updateStatus(userId, data.status);
// 广播状态更新
this.server.emit('user:status', {
userId,
status: data.status,
});
return { success: true };
}
}API控制器
用户控制器
创建 src/users/users.controller.ts:
import {
Controller,
Post,
Get,
Body,
UseGuards,
Request,
Patch,
} from '@nestjs/common';
import { UsersService } from './users.service';
import { AuthGuard } from '@nestjs/passport';
@Controller('users')
export class UsersController {
constructor(private usersService: UsersService) {}
// 注册
@Post('register')
async register(@Body() body: { username: string; email: string; password: string }) {
return await this.usersService.register(body.username, body.email, body.password);
}
// 登录
@Post('login')
async login(@Body() body: { email: string; password: string }) {
return await this.usersService.login(body.email, body.password);
}
// 获取个人资料
@UseGuards(AuthGuard('jwt'))
@Get('profile')
async getProfile(@Request() req) {
return await this.usersService.findOne(req.user.userId);
}
// 获取所有用户
@UseGuards(AuthGuard('jwt'))
@Get()
async getAllUsers() {
return await this.usersService.findAll();
}
// 更新个人资料
@UseGuards(AuthGuard('jwt'))
@Patch('profile')
async updateProfile(@Request() req, @Body() body) {
return await this.usersService.updateProfile(req.user.userId, body);
}
}消息控制器
创建 src/messages/messages.controller.ts:
import {
Controller,
Get,
Post,
Patch,
Delete,
Param,
Query,
Body,
UseGuards,
Request,
} from '@nestjs/common';
import { MessagesService } from './messages.service';
import { AuthGuard } from '@nestjs/passport';
@Controller('messages')
export class MessagesController {
constructor(private messagesService: MessagesService) {}
// 获取房间消息
@UseGuards(AuthGuard('jwt'))
@Get('room/:roomId')
async getRoomMessages(@Param('roomId') roomId: number, @Query() query: { limit?: number; offset?: number }) {
return await this.messagesService.getRoomMessages(
roomId,
query.limit,
query.offset,
);
}
// 标记消息为已读
@UseGuards(AuthGuard('jwt'))
@Patch(':id/read')
async markAsRead(@Param('id') id: number) {
return await this.messagesService.markAsRead(id);
}
// 标记房间消息为已读
@UseGuards(AuthGuard('jwt'))
@Patch('room/:roomId/read')
async markRoomMessagesAsRead(@Param('roomId') roomId: number, @Request() req) {
return await this.messagesService.markRoomMessagesAsRead(roomId, req.user.userId);
}
// 删除消息
@UseGuards(AuthGuard('jwt'))
@Delete(':id')
async remove(@Param('id') id: number) {
return await this.messagesService.remove(id);
}
}房间控制器
创建 src/rooms/rooms.controller.ts:
import {
Controller,
Post,
Get,
Put,
Delete,
Body,
Param,
UseGuards,
Request,
} from '@nestjs/common';
import { RoomsService } from './rooms.service';
import { AuthGuard } from '@nestjs/passport';
@Controller('rooms')
export class RoomsController {
constructor(private roomsService: RoomsService) {}
// 创建房间
@UseGuards(AuthGuard('jwt'))
@Post()
async create(@Request() req, @Body() body) {
return await this.roomsService.create(req.user.userId, body);
}
// 获取房间列表
@UseGuards(AuthGuard('jwt'))
@Get()
async findAll() {
return await this.roomsService.findAll();
}
// 获取单个房间
@UseGuards(AuthGuard('jwt'))
@Get(':id')
async findOne(@Param('id') id: number) {
return await this.roomsService.findOne(id);
}
// 加入房间
@UseGuards(AuthGuard('jwt'))
@Post(':id/join')
async joinRoom(@Request() req, @Param('id') id: number, @Body() body: { password?: string }) {
return await this.roomsService.joinRoom(req.user.userId, id, body.password);
}
// 离开房间
@UseGuards(AuthGuard('jwt'))
@Post(':id/leave')
async leaveRoom(@Request() req, @Param('id') id: number) {
return await this.roomsService.leaveRoom(req.user.userId, id);
}
// 删除房间
@UseGuards(AuthGuard('jwt'))
@Delete(':id')
async remove(@Request() req, @Param('id') id: number) {
return await this.roomsService.remove(id, req.user.userId);
}
// 创建私聊房间
@UseGuards(AuthGuard('jwt'))
@Post('direct/:userId')
async createDirectRoom(@Request() req, @Param('userId') userId: number) {
return await this.roomsService.createDirectRoom(req.user.userId, userId);
}
}文件控制器
创建 src/files/files.controller.ts:
import {
Controller,
Post,
UseInterceptors,
UploadedFile,
UseGuards,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { AuthGuard } from '@nestjs/passport';
import * as fs from 'fs';
import * as path from 'path';
import { v4 as uuidv4 } from 'uuid';
@Controller('files')
export class FilesController {
private uploadDir = './uploads';
constructor() {
// 确保上传目录存在
if (!fs.existsSync(this.uploadDir)) {
fs.mkdirSync(this.uploadDir, { recursive: true });
}
}
// 上传文件
@UseGuards(AuthGuard('jwt'))
@Post('upload')
@UseInterceptors(FileInterceptor('file'))
async uploadFile(@UploadedFile() file: Express.Multer.File) {
if (!file) {
return { error: '请选择文件' };
}
// 生成唯一文件名
const fileName = `${uuidv4()}${path.extname(file.originalname)}`;
const filePath = path.join(this.uploadDir, fileName);
// 保存文件
fs.writeFileSync(filePath, file.buffer);
return {
filename: fileName,
path: `/uploads/${fileName}`,
url: `http://localhost:3000/uploads/${fileName}`,
};
}
}主模块配置
创建 src/app.module.ts:
import {
Module,
} from '@nestjs/common';
import {
TypeOrmModule,
} from '@nestjs/typeorm';
import {
ConfigModule,
ConfigService,
} from '@nestjs/config';
import { UsersModule } from './users/users.module';
import { MessagesModule } from './messages/messages.module';
import { RoomsModule } from './rooms/rooms.module';
import { FilesModule } from './files/files.module';
import { AuthModule } from './auth/auth.module';
import { ChatGateway } from './gateway/chat.gateway';
import { User } from './users/entities/user.entity';
import { Message } from './messages/entities/message.entity';
import { Room } from './rooms/entities/room.entity';
import { JwtModule } from '@nestjs/jwt';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
TypeOrmModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
type: 'postgres',
host: configService.get('DATABASE_HOST'),
port: parseInt(configService.get('DATABASE_PORT')),
username: configService.get('DATABASE_USERNAME'),
password: configService.get('DATABASE_PASSWORD'),
database: configService.get('DATABASE_NAME'),
entities: [User, Message, Room],
synchronize: true,
}),
inject: [ConfigService],
}),
JwtModule.registerAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
secret: configService.get('JWT_SECRET'),
signOptions: { expiresIn: configService.get('JWT_EXPIRES_IN') },
}),
inject: [ConfigService],
}),
UsersModule,
MessagesModule,
RoomsModule,
FilesModule,
AuthModule,
],
providers: [ChatGateway],
})
export class AppModule {} 前端集成
前端项目初始化
# 创建React项目
npm create vite@latest chat-frontend -- --template react
# 进入项目目录
cd chat-frontend
# 安装依赖
npm install socket.io-client axios @reduxjs/toolkit react-reduxSocket.IO连接
创建 src/socket.js:
import { io } from 'socket.io-client';
const socket = io('http://localhost:3000', {
autoConnect: false,
path: '/socket.io',
});
export const connectSocket = (token) => {
socket.auth = { token };
socket.connect();
};
export const disconnectSocket = () => {
socket.disconnect();
};
export default socket;消息组件
创建 src/components/ChatRoom.jsx:
import React, { useState, useEffect, useRef } from 'react';
import socket from '../socket';
const ChatRoom = ({ roomId, userId }) => {
const [messages, setMessages] = useState([]);
const [input, setInput] = useState('');
const messagesEndRef = useRef(null);
useEffect(() => {
// 加入房间
socket.emit('room:join', { roomId });
// 监听新消息
socket.on('message:new', (message) => {
setMessages(prev => [...prev, message]);
});
// 监听用户加入
socket.on('room:user:join', (data) => {
console.log('用户加入:', data);
});
// 监听用户离开
socket.on('room:user:leave', (data) => {
console.log('用户离开:', data);
});
return () => {
// 离开房间
socket.emit('room:leave', { roomId });
socket.off('message:new');
socket.off('room:user:join');
socket.off('room:user:leave');
};
}, [roomId]);
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(() => {
scrollToBottom();
}, [messages]);
const handleSend = () => {
if (input.trim()) {
socket.emit('message:send', {
roomId,
content: input,
});
setInput('');
}
};
return (
<div className="chat-room">
<div className="messages">
{messages.map((message) => (
<div key={message.id} className={`message ${message.sender.id === userId ? 'own' : ''}`}>
<div className="message-content">{message.content}</div>
<div className="message-sender">{message.sender.username}</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="input-area">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && handleSend()}
placeholder="输入消息..."
/>
<button onClick={handleSend}>发送</button>
</div>
</div>
);
};
export default ChatRoom;登录组件
创建 src/components/Login.jsx:
import React, { useState } from 'react';
import axios from 'axios';
import { connectSocket } from '../socket';
const Login = ({ onLogin }) => {
const [email, setEmail] = useState('');
const [password, setPassword] = useState('');
const [error, setError] = useState('');
const handleSubmit = async (e) => {
e.preventDefault();
try {
const response = await axios.post('http://localhost:3000/users/login', {
email,
password,
});
const { access_token, user } = response.data;
// 连接Socket.IO
connectSocket(access_token);
// 存储令牌
localStorage.setItem('token', access_token);
localStorage.setItem('user', JSON.stringify(user));
// 回调
onLogin(user);
} catch (err) {
setError(err.response?.data?.message || '登录失败');
}
};
return (
<div className="login">
<h2>登录</h2>
{error && <div className="error">{error}</div>}
<form onSubmit={handleSubmit}>
<div>
<label>邮箱</label>
<input
type="email"
value={email}
onChange={(e) => setEmail(e.target.value)}
required
/>
</div>
<div>
<label>密码</label>
<input
type="password"
value={password}
onChange={(e) => setPassword(e.target.value)}
required
/>
</div>
<button type="submit">登录</button>
</form>
</div>
);
};
export default Login;运行项目
启动后端服务器
# 启动开发服务器
npm run start:dev启动前端开发服务器
# 进入前端目录
cd chat-frontend
# 启动开发服务器
npm run dev测试API
使用Postman或其他API测试工具测试以下API:
用户接口:
POST /users/register- 注册POST /users/login- 登录GET /users/profile- 获取个人资料GET /users- 获取所有用户PATCH /users/profile- 更新个人资料
房间接口:
POST /rooms- 创建房间GET /rooms- 获取房间列表GET /rooms/:id- 获取单个房间POST /rooms/:id/join- 加入房间POST /rooms/:id/leave- 离开房间DELETE /rooms/:id- 删除房间POST /rooms/direct/:userId- 创建私聊房间
消息接口:
GET /messages/room/:roomId- 获取房间消息PATCH /messages/:id/read- 标记消息为已读PATCH /messages/room/:roomId/read- 标记房间消息为已读DELETE /messages/:id- 删除消息
文件接口:
POST /files/upload- 上传文件
部署上线
构建项目
# 构建后端
npm run build
# 构建前端
cd chat-frontend
npm run build部署到服务器
- 使用PM2管理进程:
# 安装PM2
npm i -g pm2
# 启动后端
pm run start:prod
# 或者使用PM2
pm run build
pm run start:prod- 使用Docker部署:
创建 Dockerfile:
FROM node:16-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
EXPOSE 3000
CMD ["npm", "run", "start:prod"]创建 docker-compose.yml:
version: '3'
services:
app:
build: .
ports:
- '3000:3000'
depends_on:
- db
environment:
- DATABASE_HOST=db
- DATABASE_PORT=5432
- DATABASE_USERNAME=postgres
- DATABASE_PASSWORD=postgres
- DATABASE_NAME=chat_app
db:
image: postgres:13
ports:
- '5432:5432'
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=chat_app
volumes:
- postgres-data:/var/lib/postgresql/data
volumes:
postgres-data:启动Docker容器:
docker-compose up -d性能优化
后端优化
- 使用Redis缓存:
# 安装Redis依赖
npm install redis- 消息批处理:
// 消息批处理
private messageQueue = new Map<string, Message[]>();
async processMessageQueue() {
for (const [roomId, messages] of this.messageQueue) {
// 批量保存消息
await this.messagesRepository.save(messages);
// 广播消息
for (const message of messages) {
this.server.to(`room_${roomId}`).emit('message:new', message);
}
this.messageQueue.delete(roomId);
}
}- 连接池管理:
// 连接池配置
const pool = new Pool({
max: 20,
min: 5,
idle: 10000,
});前端优化
- 消息分页加载:
const loadMoreMessages = async () => {
const response = await axios.get(`/messages/room/${roomId}`, {
params: {
limit: 50,
offset: messages.length,
},
});
setMessages(prev => [...response.data, ...prev]);
};- WebSocket重连:
// 重连机制
socket.on('disconnect', () => {
setTimeout(() => {
connectSocket(localStorage.getItem('token'));
}, 5000);
});- 消息压缩:
// 启用消息压缩
socket.compress(true).emit('message:send', data);安全措施
- WebSocket认证:使用JWT验证WebSocket连接
- 消息加密:对敏感消息进行加密传输
- 输入验证:验证所有用户输入
- SQL注入防护:使用TypeORM的参数化查询
- XSS防护:对用户输入进行转义
- CSRF防护:实现CSRF令牌验证
- 密码安全:使用bcrypt加密存储密码
- API限流:实现请求频率限制
- HTTPS:在生产环境使用HTTPS
- 定期安全审计:定期检查系统安全漏洞
总结
本教程详细讲解了如何使用NestJS和WebSocket技术开发一个完整的实时聊天应用,包括用户认证、消息处理、房间管理等核心功能。通过本教程的学习,你应该能够:
- 理解实时聊天系统的整体架构设计
- 掌握NestJS框架的核心功能和使用方法
- 学会使用Socket.IO实现实时通信
- 实现用户认证、消息处理、房间管理等功能
- 了解前端集成和部署策略
- 掌握实时应用的性能优化和安全措施
互动问答
问题1:如何实现WebSocket连接认证?
答案:在WebSocket连接时通过handshake.auth传递JWT令牌,在服务器端验证令牌的有效性,只有验证通过的连接才被允许。
问题2:如何实现私聊功能?
答案:创建类型为direct的房间,自动将两个用户添加到房间中,实现一对一的私聊功能。
问题3:如何优化实时聊天应用的性能?
答案:可以通过使用Redis缓存、消息批处理、连接池管理、消息分页加载、WebSocket重连、消息压缩等方式提高系统性能。
问题4:如何确保实时聊天应用的安全性?
答案:可以通过WebSocket认证、消息加密、输入验证、SQL注入防护、XSS防护、CSRF防护、密码安全、API限流、HTTPS、定期安全审计等措施确保系统安全。
问题5:如何实现文件上传和分享功能?
答案:创建文件上传接口,使用multer处理文件上传,将文件保存到本地或云存储,然后通过消息发送文件URL实现文件分享。
实践作业
实现消息撤回功能:
- 添加消息撤回API
- 实现前端撤回按钮
- 广播撤回通知
实现消息编辑功能:
- 添加消息编辑API
- 实现前端编辑界面
- 广播编辑通知
实现用户在线状态显示:
- 实时更新用户状态
- 在前端显示在线状态
- 实现状态变更通知
实现消息已读回执:
- 标记消息已读状态
- 显示已读用户列表
- 实现已读回执通知
实现多语言支持:
- 添加国际化配置
- 实现语言切换功能
- 支持多语言消息
通过完成这些作业,你将能够进一步巩固所学知识,开发出功能更加完善的实时聊天应用。