NestJS微服务实战教程
学习目标
- 掌握微服务架构的基本概念和设计原则
- 学会在NestJS中创建和配置微服务
- 理解不同类型的微服务通信方式
- 掌握服务发现和负载均衡的实现方法
- 学会处理微服务的容错和故障恢复
- 了解微服务的部署和监控策略
- 掌握微服务架构的最佳实践
核心概念
微服务架构简介
微服务架构是一种将应用程序设计为一组松耦合服务的方法。每个服务都围绕特定的业务功能构建,可以独立部署和扩展。微服务架构具有以下优势:
- 灵活性:各服务可以独立开发、部署和扩展
- 可维护性:每个服务都相对较小,易于理解和维护
- 技术多样性:不同服务可以使用不同的技术栈
- 弹性:单个服务故障不会影响整个系统
- 可扩展性:可以根据需要单独扩展特定服务
微服务核心概念
- 服务:独立部署的功能单元
- 服务间通信:服务之间的交互方式(同步/异步)
- 服务发现:自动检测和注册服务实例
- 负载均衡:在多个服务实例之间分配请求
- 容错处理:处理服务故障的机制
- 配置管理:集中管理服务配置
- 监控和日志:跟踪服务状态和性能
项目初始化
创建微服务项目
# 创建NestJS项目
npm i -g @nestjs/cli
nest new microservices-practical
# 进入项目目录
cd microservices-practical
# 安装微服务相关依赖
npm install @nestjs/microservices
# 安装通信所需依赖
npm install redis amqplib kafkajs
# 安装其他依赖
npm install @nestjs/config微服务类型
NestJS支持多种微服务传输方式:
- TCP:基于TCP协议的同步通信
- Redis:基于Redis的发布/订阅模式
- MQTT:轻量级消息协议
- RabbitMQ:基于AMQP协议的消息队列
- Kafka:高吞吐量的分布式消息系统
- NATS:高性能消息系统
实现微服务
服务端实现
创建用户服务
创建 src/user-service/user.service.ts:
import {
Injectable,
} from '@nestjs/common';
@Injectable()
export class UserService {
private users = [
{ id: 1, name: '张三', email: 'zhangsan@example.com' },
{ id: 2, name: '李四', email: 'lisi@example.com' },
{ id: 3, name: '王五', email: 'wangwu@example.com' },
];
findAll() {
return this.users;
}
findOne(id: number) {
return this.users.find(user => user.id === id);
}
create(user: any) {
const newUser = {
id: this.users.length + 1,
...user,
};
this.users.push(newUser);
return newUser;
}
update(id: number, user: any) {
const index = this.users.findIndex(u => u.id === id);
if (index !== -1) {
this.users[index] = {
...this.users[index],
...user,
};
return this.users[index];
}
return null;
}
delete(id: number) {
const index = this.users.findIndex(u => u.id === id);
if (index !== -1) {
return this.users.splice(index, 1)[0];
}
return null;
}
}创建 src/user-service/user.controller.ts:
import {
Controller,
Get,
Post,
Put,
Delete,
Param,
Body,
} from '@nestjs/common';
import {
MessagePattern,
Payload,
} from '@nestjs/microservices';
import { UserService } from './user.service';
@Controller()
export class UserController {
constructor(private userService: UserService) {}
@MessagePattern('getUsers')
getUsers() {
return this.userService.findAll();
}
@MessagePattern('getUserById')
getUserById(@Payload() data: { id: number }) {
return this.userService.findOne(data.id);
}
@MessagePattern('createUser')
createUser(@Payload() user: any) {
return this.userService.create(user);
}
@MessagePattern('updateUser')
updateUser(@Payload() data: { id: number; user: any }) {
return this.userService.update(data.id, data.user);
}
@MessagePattern('deleteUser')
deleteUser(@Payload() data: { id: number }) {
return this.userService.delete(data.id);
}
}创建 src/user-service/user.module.ts:
import {
Module,
} from '@nestjs/common';
import { UserService } from './user.service';
import { UserController } from './user.controller';
@Module({
providers: [UserService],
controllers: [UserController],
})
export class UserModule {}创建产品服务
创建 src/product-service/product.service.ts:
import {
Injectable,
} from '@nestjs/common';
@Injectable()
export class ProductService {
private products = [
{ id: 1, name: '产品A', price: 100, stock: 10 },
{ id: 2, name: '产品B', price: 200, stock: 20 },
{ id: 3, name: '产品C', price: 300, stock: 30 },
];
findAll() {
return this.products;
}
findOne(id: number) {
return this.products.find(product => product.id === id);
}
create(product: any) {
const newProduct = {
id: this.products.length + 1,
...product,
};
this.products.push(newProduct);
return newProduct;
}
update(id: number, product: any) {
const index = this.products.findIndex(p => p.id === id);
if (index !== -1) {
this.products[index] = {
...this.products[index],
...product,
};
return this.products[index];
}
return null;
}
delete(id: number) {
const index = this.products.findIndex(p => p.id === id);
if (index !== -1) {
return this.products.splice(index, 1)[0];
}
return null;
}
}创建 src/product-service/product.controller.ts:
import {
Controller,
} from '@nestjs/common';
import {
MessagePattern,
Payload,
} from '@nestjs/microservices';
import { ProductService } from './product.service';
@Controller()
export class ProductController {
constructor(private productService: ProductService) {}
@MessagePattern('getProducts')
getProducts() {
return this.productService.findAll();
}
@MessagePattern('getProductById')
getProductById(@Payload() data: { id: number }) {
return this.productService.findOne(data.id);
}
@MessagePattern('createProduct')
createProduct(@Payload() product: any) {
return this.productService.create(product);
}
@MessagePattern('updateProduct')
updateProduct(@Payload() data: { id: number; product: any }) {
return this.productService.update(data.id, data.product);
}
@MessagePattern('deleteProduct')
deleteProduct(@Payload() data: { id: number }) {
return this.productService.delete(data.id);
}
}创建 src/product-service/product.module.ts:
import {
Module,
} from '@nestjs/common';
import { ProductService } from './product.service';
import { ProductController } from './product.controller';
@Module({
providers: [ProductService],
controllers: [ProductController],
})
export class ProductModule {}启动微服务
启动用户服务
创建 src/user-main.ts:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { UserModule } from './user-service/user.module';
import { ConfigService } from '@nestjs/config';
async function bootstrap() {
const app = await NestFactory.create(UserModule);
const configService = app.get(ConfigService);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
});
await app.startAllMicroservices();
console.log('User microservice is running');
}
bootstrap();启动产品服务
创建 src/product-main.ts:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { ProductModule } from './product-service/product.module';
async function bootstrap() {
const app = await NestFactory.create(ProductModule);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3002,
},
});
await app.startAllMicroservices();
console.log('Product microservice is running');
}
bootstrap();服务间通信
同步通信
TCP通信
创建 src/gateway/gateway.controller.ts:
import {
Controller,
Get,
Post,
Put,
Delete,
Param,
Body,
} from '@nestjs/common';
import {
Client,
ClientProxy,
Transport,
MessagePattern,
SubscribeMessage,
} from '@nestjs/microservices';
@Controller()
export class GatewayController {
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
})
private userClient: ClientProxy;
@Client({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3002,
},
})
private productClient: ClientProxy;
@Get('/users')
async getUsers() {
return this.userClient.send('getUsers', {});
}
@Get('/users/:id')
async getUserById(@Param('id') id: number) {
return this.userClient.send('getUserById', { id });
}
@Post('/users')
async createUser(@Body() user: any) {
return this.userClient.send('createUser', user);
}
@Put('/users/:id')
async updateUser(@Param('id') id: number, @Body() user: any) {
return this.userClient.send('updateUser', { id, user });
}
@Delete('/users/:id')
async deleteUser(@Param('id') id: number) {
return this.userClient.send('deleteUser', { id });
}
@Get('/products')
async getProducts() {
return this.productClient.send('getProducts', {});
}
@Get('/products/:id')
async getProductById(@Param('id') id: number) {
return this.productClient.send('getProductById', { id });
}
@Post('/products')
async createProduct(@Body() product: any) {
return this.productClient.send('createProduct', product);
}
@Put('/products/:id')
async updateProduct(@Param('id') id: number, @Body() product: any) {
return this.productClient.send('updateProduct', { id, product });
}
@Delete('/products/:id')
async deleteProduct(@Param('id') id: number) {
return this.productClient.send('deleteProduct', { id });
}
}异步通信
Redis发布/订阅
修改 src/user-main.ts 使用Redis:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { UserModule } from './user-service/user.module';
async function bootstrap() {
const app = await NestFactory.create(UserModule);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
});
await app.startAllMicroservices();
console.log('User microservice is running with Redis');
}
bootstrap();修改 src/product-main.ts 使用Redis:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { ProductModule } from './product-service/product.module';
async function bootstrap() {
const app = await NestFactory.create(ProductModule);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
});
await app.startAllMicroservices();
console.log('Product microservice is running with Redis');
}
bootstrap();修改 src/gateway/gateway.controller.ts 使用Redis:
import {
Controller,
Get,
Post,
Put,
Delete,
Param,
Body,
} from '@nestjs/common';
import {
Client,
ClientProxy,
Transport,
} from '@nestjs/microservices';
@Controller()
export class GatewayController {
@Client({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
})
private userClient: ClientProxy;
@Client({
transport: Transport.REDIS,
options: {
url: 'redis://localhost:6379',
},
})
private productClient: ClientProxy;
// 方法与之前相同
}RabbitMQ通信
修改 src/user-main.ts 使用RabbitMQ:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { UserModule } from './user-service/user.module';
async function bootstrap() {
const app = await NestFactory.create(UserModule);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'user_queue',
queueOptions: {
durable: false,
},
},
});
await app.startAllMicroservices();
console.log('User microservice is running with RabbitMQ');
}
bootstrap();修改 src/product-main.ts 使用RabbitMQ:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { ProductModule } from './product-service/product.module';
async function bootstrap() {
const app = await NestFactory.create(ProductModule);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'product_queue',
queueOptions: {
durable: false,
},
},
});
await app.startAllMicroservices();
console.log('Product microservice is running with RabbitMQ');
}
bootstrap();服务发现
Consul服务发现
安装Consul依赖:
npm install consul创建 src/common/services/consul.service.ts:
import {
Injectable,
OnModuleInit,
} from '@nestjs/common';
import * as Consul from 'consul';
@Injectable()
export class ConsulService implements OnModuleInit {
private consul: Consul.Consul;
constructor() {
this.consul = new Consul({
host: 'localhost',
port: 8500,
});
}
async onModuleInit() {
console.log('Consul service initialized');
}
async registerService(serviceName: string, serviceId: string, address: string, port: number) {
try {
await this.consul.agent.service.register({
id: serviceId,
name: serviceName,
address,
port,
check: {
http: `http://${address}:${port}/health`,
interval: '10s',
timeout: '5s',
},
});
console.log(`Service ${serviceName} registered with Consul`);
} catch (error) {
console.error('Error registering service with Consul:', error);
}
}
async deregisterService(serviceId: string) {
try {
await this.consul.agent.service.deregister(serviceId);
console.log(`Service ${serviceId} deregistered from Consul`);
} catch (error) {
console.error('Error deregistering service with Consul:', error);
}
}
async discoverService(serviceName: string) {
try {
const services = await this.consul.agent.services();
const serviceInstances = [];
for (const serviceId in services) {
if (services[serviceId].Service === serviceName) {
serviceInstances.push({
id: serviceId,
address: services[serviceId].Address,
port: services[serviceId].Port,
});
}
}
return serviceInstances;
} catch (error) {
console.error('Error discovering service with Consul:', error);
return [];
}
}
}修改 src/user-main.ts 注册服务:
import {
NestFactory,
} from '@nestjs/core';
import {
Transport,
MicroserviceOptions,
} from '@nestjs/microservices';
import { UserModule } from './user-service/user.module';
import { ConsulService } from './common/services/consul.service';
async function bootstrap() {
const app = await NestFactory.create(UserModule);
const consulService = app.get(ConsulService);
// 注册服务
await consulService.registerService(
'user-service',
`user-service-${Math.random().toString(36).substr(2, 9)}`,
'localhost',
3001
);
const microservice = app.connectMicroservice<MicroserviceOptions>({
transport: Transport.TCP,
options: {
host: 'localhost',
port: 3001,
},
});
await app.startAllMicroservices();
console.log('User microservice is running');
}
bootstrap();负载均衡
客户端负载均衡
创建 src/common/services/load-balancer.service.ts:
import {
Injectable,
} from '@nestjs/common';
import { ConsulService } from './consul.service';
@Injectable()
export class LoadBalancerService {
constructor(private consulService: ConsulService) {}
private currentIndex = new Map<string, number>();
// 轮询负载均衡
async roundRobin(serviceName: string) {
const instances = await this.consulService.discoverService(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for service ${serviceName}`);
}
let index = this.currentIndex.get(serviceName) || 0;
const instance = instances[index];
index = (index + 1) % instances.length;
this.currentIndex.set(serviceName, index);
return instance;
}
// 随机负载均衡
async random(serviceName: string) {
const instances = await this.consulService.discoverService(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for service ${serviceName}`);
}
const randomIndex = Math.floor(Math.random() * instances.length);
return instances[randomIndex];
}
// 最少连接负载均衡(简化版)
async leastConnection(serviceName: string) {
const instances = await this.consulService.discoverService(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for service ${serviceName}`);
}
// 简化实现,实际应该跟踪每个实例的连接数
return instances[0];
}
}容错处理
重试机制
创建 src/common/decorators/retry.decorator.ts:
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
Retryable,
} from '@nestjs/common';
import { Observable, throwError, timer } from 'rxjs';
import { catchError, retryWhen, take, mergeMap } from 'rxjs/operators';
@Injectable()
export class RetryInterceptor implements NestInterceptor {
constructor(private readonly maxRetries = 3, private readonly delay = 1000) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(
retryWhen(errors => {
return errors.pipe(
mergeMap((error, index) => {
if (index >= this.maxRetries) {
return throwError(error);
}
return timer(this.delay * Math.pow(2, index)); // 指数退避
}),
take(this.maxRetries + 1)
);
})
);
}
}熔断器模式
创建 src/common/services/circuit-breaker.service.ts:
import {
Injectable,
} from '@nestjs/common';
interface CircuitBreakerState {
state: 'CLOSED' | 'OPEN' | 'HALF_OPEN';
failureCount: number;
lastFailureTime: number;
resetTimeout: number;
failureThreshold: number;
}
@Injectable()
export class CircuitBreakerService {
private circuits = new Map<string, CircuitBreakerState>();
constructor() {
// 初始化默认配置
}
async execute<T>(serviceName: string, action: () => Promise<T>): Promise<T> {
const circuit = this.getOrCreateCircuit(serviceName);
if (circuit.state === 'OPEN') {
if (Date.now() - circuit.lastFailureTime > circuit.resetTimeout) {
circuit.state = 'HALF_OPEN';
} else {
throw new Error(`Circuit breaker is OPEN for service ${serviceName}`);
}
}
try {
const result = await action();
this.onSuccess(serviceName);
return result;
} catch (error) {
this.onFailure(serviceName);
throw error;
}
}
private getOrCreateCircuit(serviceName: string): CircuitBreakerState {
if (!this.circuits.has(serviceName)) {
this.circuits.set(serviceName, {
state: 'CLOSED',
failureCount: 0,
lastFailureTime: 0,
resetTimeout: 30000, // 30秒
failureThreshold: 3, // 失败3次后打开
});
}
return this.circuits.get(serviceName);
}
private onSuccess(serviceName: string) {
const circuit = this.getOrCreateCircuit(serviceName);
circuit.failureCount = 0;
circuit.state = 'CLOSED';
}
private onFailure(serviceName: string) {
const circuit = this.getOrCreateCircuit(serviceName);
circuit.failureCount++;
circuit.lastFailureTime = Date.now();
if (circuit.failureCount >= circuit.failureThreshold) {
circuit.state = 'OPEN';
}
}
}部署与监控
Docker部署
创建 Dockerfile:
FROM node:16-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
EXPOSE 3001 3002 3000
CMD ["npm", "run", "start:gateway"]创建 docker-compose.yml:
version: '3'
services:
consul:
image: consul:latest
ports:
- '8500:8500'
- '8600:8600/udp'
redis:
image: redis:latest
ports:
- '6379:6379'
rabbitmq:
image: rabbitmq:3-management
ports:
- '5672:5672'
- '15672:15672'
user-service:
build: .
command: npm run start:user
depends_on:
- consul
- redis
environment:
- CONSUL_HOST=consul
product-service:
build: .
command: npm run start:product
depends_on:
- consul
- redis
environment:
- CONSUL_HOST=consul
gateway:
build: .
command: npm run start:gateway
ports:
- '3000:3000'
depends_on:
- consul
- user-service
- product-service
environment:
- CONSUL_HOST=consul监控
Prometheus监控
安装Prometheus依赖:
npm install prom-client创建 src/common/services/metrics.service.ts:
import {
Injectable,
OnModuleInit,
} from '@nestjs/common';
import * as prometheus from 'prom-client';
@Injectable()
export class MetricsService implements OnModuleInit {
private counter: prometheus.Counter;
private histogram: prometheus.Histogram;
async onModuleInit() {
// 清除默认指标
prometheus.register.clear();
// 创建自定义指标
this.counter = new prometheus.Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'endpoint', 'status'],
});
this.histogram = new prometheus.Histogram({
name: 'http_request_duration_seconds',
help: 'HTTP request duration in seconds',
labelNames: ['method', 'endpoint'],
buckets: [0.1, 0.5, 1, 2, 5],
});
// 注册默认指标
prometheus.collectDefaultMetrics();
}
incrementCounter(method: string, endpoint: string, status: number) {
this.counter.labels(method, endpoint, status.toString()).inc();
}
observeRequestDuration(method: string, endpoint: string, duration: number) {
this.histogram.labels(method, endpoint).observe(duration);
}
async getMetrics() {
return prometheus.register.metrics();
}
}最佳实践
微服务设计原则
- 单一职责:每个服务只负责一个特定的业务功能
- 松耦合:服务之间通过明确定义的接口通信
- 高内聚:相关功能应该放在同一个服务中
- API优先:设计清晰的API接口
- 数据隔离:每个服务管理自己的数据
- 容错设计:处理服务故障的机制
- 可观测性:完善的监控和日志
- 自动化部署:CI/CD流程
服务间通信最佳实践
选择合适的通信方式:
- 同步通信:适用于需要即时响应的场景
- 异步通信:适用于高吞吐量、松耦合的场景
使用消息队列:
- 提高系统可靠性
- 削峰填谷
- 解耦服务
实现重试机制:
- 处理临时故障
- 避免级联失败
设置合理的超时:
- 防止请求无限期等待
- 及时释放资源
服务发现和负载均衡
使用专业的服务发现工具:
- Consul
- Etcd
- ZooKeeper
实现健康检查:
- 定期检查服务状态
- 自动移除不健康的实例
选择合适的负载均衡策略:
- 轮询
- 随机
- 最少连接
- IP哈希
容错和故障恢复
实现熔断器模式:
- 防止级联失败
- 快速失败
实现断路器:
- 监控服务健康状态
- 自动切换故障服务
实现限流:
- 防止系统过载
- 保护关键服务
实现降级:
- 在服务故障时提供备选方案
- 保证核心功能可用
总结
本教程详细讲解了如何在NestJS中实现微服务架构,包括:
- 微服务架构的基本概念和设计原则
- 在NestJS中创建和配置微服务
- 不同类型的微服务通信方式(TCP、Redis、RabbitMQ)
- 服务发现和负载均衡的实现方法
- 微服务的容错和故障恢复机制
- 微服务的部署和监控策略
- 微服务架构的最佳实践
通过本教程的学习,你应该能够:
- 理解微服务架构的核心概念和优势
- 掌握在NestJS中开发微服务的方法
- 实现服务间的有效通信
- 处理微服务的容错和故障恢复
- 设计和部署可靠的微服务系统
互动问答
问题1:微服务架构相比单体架构有哪些优势?
答案:微服务架构相比单体架构的优势包括:灵活性高、可维护性强、技术多样性、弹性好、可扩展性强等。
问题2:NestJS支持哪些微服务传输方式?
答案:NestJS支持多种微服务传输方式,包括TCP、Redis、MQTT、RabbitMQ、Kafka、NATS等。
问题3:什么是服务发现?为什么需要服务发现?
答案:服务发现是自动检测和注册服务实例的机制。在微服务架构中,服务实例可能动态增减,需要服务发现来自动管理这些变化。
问题4:什么是熔断器模式?它有什么作用?
答案:熔断器模式是一种容错机制,当服务故障达到一定阈值时,熔断器会打开,防止请求继续发送到故障服务,从而避免级联失败。
问题5:如何监控微服务架构?
答案:可以使用Prometheus、Grafana等工具监控微服务的健康状态、性能指标和日志,实现全面的可观测性。
实践作业
实现完整的微服务系统:
- 添加订单服务
- 实现服务间的复杂交互
优化服务发现:
- 集成Etcd作为服务发现工具
- 实现服务健康检查
增强容错能力:
- 实现完整的熔断器模式
- 添加限流和降级机制
完善监控系统:
- 集成Grafana可视化监控
- 实现告警机制
部署到云平台:
- 部署到Kubernetes
- 实现自动扩缩容
通过完成这些作业,你将能够进一步巩固所学知识,开发出更加可靠和高性能的微服务系统。