第68集:单例模式

学习目标

  1. 理解单例模式的概念和作用
  2. 掌握Python中实现单例模式的多种方法
  3. 学会使用单例模式解决实际问题
  4. 了解单例模式的优缺点和适用场景
  5. 掌握线程安全的单例模式实现

单例模式概念

什么是单例模式

单例模式(Singleton Pattern)是一种创建型设计模式,确保一个类只有一个实例,并提供一个全局访问点来获取该实例。

为什么使用单例模式

  • 资源共享:多个模块共享同一个资源(如数据库连接、配置管理器)
  • 控制访问:限制某些资源的访问权限,确保统一控制
  • 节省开销:避免频繁创建和销毁重量级对象
  • 全局状态:维护全局状态或计数器
  • 配置管理:统一管理应用程序配置

单例模式的核心思想

  1. 私有化构造方法:防止外部直接实例化
  2. 静态访问方法:提供统一的实例获取入口
  3. 实例缓存:在类内部维护唯一实例的引用

单例模式实现方法

方法1:重写__new__方法(最常用)

基本实现

class Singleton:
    """单例类 - 通过重写__new__方法实现"""
    _instance = None  # 类变量,存储唯一实例
    
    def __new__(cls, *args, **kwargs):
        """重写__new__方法控制实例创建"""
        if cls._instance is None:
            # 如果还没有实例,创建一个
            cls._instance = super().__new__(cls)
        # 返回已存在的实例
        return cls._instance
    
    def __init__(self, name=None):
        """初始化方法 - 注意可能被多次调用"""
        # 需要判断是否已经初始化过,避免重复初始化
        if not hasattr(self, '_initialized'):
            self.name = name or "默认单例"
            self._initialized = True
            print(f"Singleton初始化: {self.name}")

# 测试单例模式
singleton1 = Singleton("第一个实例")
singleton2 = Singleton("第二个实例")

print(f"singleton1 ID: {id(singleton1)}")
print(f"singleton2 ID: {id(singleton2)}")
print(f"是否为同一实例: {singleton1 is singleton2}")
print(f"singleton1.name: {singleton1.name}")  # 注意:name会是"第二个实例"
print(f"singleton2.name: {singleton2.name}")

改进版本:防止重复初始化

class ImprovedSingleton:
    """改进的单例类 - 防止重复初始化"""
    _instance = None
    
    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, config=None):
        """改进的初始化方法"""
        # 使用特殊属性标记是否已初始化
        if not hasattr(self, '_is_initialized'):
            self.config = config or {}
            self._is_initialized = True
            self._setup()
            print(f"ImprovedSingleton初始化完成,配置: {self.config}")
    
    def _setup(self):
        """实际的初始化逻辑"""
        self.start_time = "2024-01-01"
        self.version = "1.0.0"
    
    def update_config(self, new_config):
        """更新配置的方法"""
        self.config.update(new_config)
        print(f"配置已更新: {self.config}")

# 测试改进版本
single1 = ImprovedSingleton({"database": "mysql"})
single2 = ImprovedSingleton({"cache": "redis"})  # 不会重新初始化

print(f"同一实例: {single1 is single2}")
print(f"最终配置: {single1.config}")  # 包含两个配置

方法2:使用装饰器实现单例

单例装饰器

def singleton_decorator(cls):
    """单例装饰器 - 将任何类转换为单例"""
    instances = {}  # 存储各个类的实例
    
    def wrapper(*args, **kwargs):
        # 如果该类还没有实例,创建一个
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return wrapper

# 使用装饰器创建单例类
@singleton_decorator
class DatabaseConnection:
    """数据库连接类 - 使用装饰器实现单例"""
    
    def __init__(self, connection_string="default_db"):
        self.connection_string = connection_string
        self.is_connected = False
        print(f"创建数据库连接: {connection_string}")
    
    def connect(self):
        """连接数据库"""
        self.is_connected = True
        print(f"连接到数据库: {self.connection_string}")
    
    def disconnect(self):
        """断开连接"""
        self.is_connected = False
        print("数据库连接已断开")

@singleton_decorator
class Logger:
    """日志记录器 - 另一个单例类"""
    
    def __init__(self, log_file="app.log"):
        self.log_file = log_file
        self.messages = []
        print(f"创建日志记录器: {log_file}")
    
    def log(self, message):
        """记录日志"""
        timestamp = "2024-01-01 10:00:00"  # 简化时间处理
        log_entry = f"[{timestamp}] {message}"
        self.messages.append(log_entry)
        print(f"记录日志: {log_entry}")

# 测试装饰器单例
print("=== 测试装饰器单例模式 ===")
db1 = DatabaseConnection("mysql://localhost:3306/mydb")
db2 = DatabaseConnection("postgresql://localhost:5432/otherdb")  # 不会创建新实例

print(f"数据库连接是否为同一实例: {db1 is db2}")
print(f"连接字符串: {db1.connection_string}")  # 仍然是第一个字符串

db1.connect()
print(f"db2连接状态: {db2.is_connected}")  # True,因为是同一个实例

# 测试多个单例类
logger1 = Logger("app1.log")
logger2 = Logger("app2.log")

print(f"日志记录器是否为同一实例: {logger1 is logger2}")
print(f"日志文件: {logger1.log_file}")  # app1.log

logger1.log("这是一条测试日志")
print(f"logger2的消息数量: {len(logger2.messages)}")  # 1,共享消息列表

方法3:使用元类实现单例

单例元类

class SingletonMeta(type):
    """单例元类 - 通过元类控制类的实例化"""
    
    def __init__(cls, name, bases, attrs):
        """元类的初始化"""
        super().__init__(name, bases, attrs)
        cls._instance = None  # 为每个类创建独立的实例存储
    
    def __call__(cls, *args, **kwargs):
        """控制类的实例化过程"""
        if cls._instance is None:
            # 创建实例
            cls._instance = super().__call__(*args, **kwargs)
        return cls._instance

class ConfigManager(metaclass=SingletonMeta):
    """配置管理器 - 使用元类实现单例"""
    
    def __init__(self, config_file="config.json"):
        self.config_file = config_file
        self.settings = {}
        self._load_config()
        print(f"配置管理器初始化,加载文件: {config_file}")
    
    def _load_config(self):
        """加载配置文件(模拟)"""
        # 模拟从文件加载配置
        self.settings = {
            "database_url": "sqlite:///app.db",
            "debug": True,
            "max_connections": 100
        }
    
    def get(self, key, default=None):
        """获取配置值"""
        return self.settings.get(key, default)
    
    def set(self, key, value):
        """设置配置值"""
        self.settings[key] = value
        print(f"配置已更新: {key} = {value}")
    
    def reload(self):
        """重新加载配置"""
        self._load_config()
        print("配置已重新加载")

class CacheManager(metaclass=SingletonMeta):
    """缓存管理器 - 另一个使用元类的单例"""
    
    def __init__(self, cache_type="memory"):
        self.cache_type = cache_type
        self._cache = {}
        print(f"缓存管理器初始化,类型: {cache_type}")
    
    def set(self, key, value):
        """设置缓存"""
        self._cache[key] = value
        print(f"缓存设置: {key} = {value}")
    
    def get(self, key):
        """获取缓存"""
        return self._cache.get(key)
    
    def clear(self):
        """清空缓存"""
        self._cache.clear()
        print("缓存已清空")

# 测试元类单例
print("=== 测试元类单例模式 ===")
config1 = ConfigManager("app_config.json")
config2 = ConfigManager("other_config.json")  # 不会重新初始化

print(f"配置管理器是否为同一实例: {config1 is config2}")
print(f"配置文件: {config1.config_file}")  # app_config.json

config1.set("timeout", 30)
print(f"config2中的timeout: {config2.get('timeout')}")  # 30,共享状态

# 测试多个单例类
cache1 = CacheManager("redis")
cache2 = CacheManager("memcached")

print(f"缓存管理器是否为同一实例: {cache1 is cache2}")
print(f"缓存类型: {cache1.cache_type}")  # redis

cache1.set("user_123", {"name": "张三", "age": 25})
print(f"cache2中的用户数据: {cache2.get('user_123')}")  # 共享缓存数据

方法4:使用模块实现单例(Python特有)

模块级别的单例

# config_module.py
"""配置模块 - Python模块天然是单例的"""

# 模块级别的变量 - 天然单例
_config = {
    "database_url": "sqlite:///default.db",
    "debug": False,
    "version": "1.0.0"
}

# 模块级别的函数
def get_config(key, default=None):
    """获取配置"""
    return _config.get(key, default)

def set_config(key, value):
    """设置配置"""
    _config[key] = value
    print(f"配置已更新: {key} = {value}")

def get_all_config():
    """获取所有配置"""
    return _config.copy()

# 初始化代码(模块加载时执行)
print("配置模块已加载,初始配置:", _config)

# 在同一个目录下的使用示例
# main.py
import config_module

print("=== 测试模块级单例 ===")
print("第一次访问配置:", config_module.get_all_config())

config_module.set_config("debug", True)
config_module.set_config("max_users", 1000)

print("第二次访问配置:", config_module.get_all_config())

# 在不同的"导入"中也是同一个实例
import config_module as cm
print(f"不同导入的模块是否相同: {config_module is cm}")
print(f"配置是否共享: {cm.get_config('debug')}")  # True

线程安全的单例模式

多线程环境下的单例问题

import threading
import time

class ThreadUnsafeSingleton:
    """非线程安全的单例类 - 用于演示问题"""
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            # 模拟耗时操作,增加线程冲突概率
            time.sleep(0.01)
            cls._instance = super().__new__(cls)
        return cls._instance

# 测试线程安全问题
def create_singleton(instance_id):
    """创建单例实例的函数"""
    singleton = ThreadUnsafeSingleton()
    print(f"线程{instance_id}创建的实例ID: {id(singleton)}")
    return singleton

print("=== 测试非线程安全单例 ===")
threads = []
instances = []

# 创建多个线程同时创建单例
for i in range(5):
    thread = threading.Thread(target=lambda i=i: instances.append(create_singleton(i)))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

# 检查是否创建了多个实例
unique_ids = set(id(instance) for instance in instances)
print(f"创建了 {len(instances)} 个实例,但有 {len(unique_ids)} 个不同的ID")
if len(unique_ids) > 1:
    print("⚠️  存在线程安全问题!创建了多个实例")

线程安全的单例实现

使用线程锁

import threading
import time

class ThreadSafeSingleton:
    """线程安全的单例类 - 使用锁机制"""
    _instance = None
    _lock = threading.Lock()  # 线程锁
    
    def __new__(cls, *args, **kwargs):
        # 双重检查锁定模式(Double-Checked Locking)
        if cls._instance is None:  # 第一次检查(无锁,提高性能)
            with cls._lock:  # 加锁
                if cls._instance is None:  # 第二次检查(有锁,确保线程安全)
                    time.sleep(0.01)  # 模拟耗时操作
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, name="ThreadSafeSingleton"):
        if not hasattr(self, '_initialized'):
            self.name = name
            self._initialized = True
            print(f"ThreadSafeSingleton初始化: {name}")

def create_thread_safe_singleton(instance_id):
    """创建线程安全单例的函数"""
    singleton = ThreadSafeSingleton(f"实例{instance_id}")
    print(f"线程{instance_id}创建的实例ID: {id(singleton)}")
    return singleton

print("\n=== 测试线程安全单例 ===")
threads = []
instances = []

# 创建多个线程同时创建单例
for i in range(5):
    thread = threading.Thread(target=lambda i=i: instances.append(create_thread_safe_singleton(i)))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

# 检查是否确保了单例
unique_ids = set(id(instance) for instance in instances)
print(f"创建了 {len(instances)} 个实例,但有 {len(unique_ids)} 个不同的ID")
if len(unique_ids) == 1:
    print("✅ 线程安全!始终只创建一个实例")
else:
    print("❌ 仍然存在线程安全问题")

使用__new__方法的简化线程安全版本

import threading

class SimpleThreadSafeSingleton:
    """简化的线程安全单例"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        with cls._lock:  # 简单粗暴的线程安全
            if cls._instance is None:
                cls._instance = super().__new__(cls)
        return cls._instance

单例模式应用案例

案例1:应用程序配置管理器

import json
from typing import Dict, Any
import threading

class AppConfig:
    """应用程序配置管理器 - 单例模式的实际应用"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, config_file="app_config.json"):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, config_file="app_config.json"):
        # 防止重复初始化
        if not hasattr(self, '_initialized'):
            self.config_file = config_file
            self._config = {}
            self._observers = []  # 配置变更观察者
            self._load_config()
            self._initialized = True
            print(f"配置管理器初始化完成,配置文件: {config_file}")
    
    def _load_config(self):
        """加载配置文件"""
        try:
            # 模拟从JSON文件加载配置
            default_config = {
                "database": {
                    "host": "localhost",
                    "port": 5432,
                    "name": "myapp"
                },
                "redis": {
                    "host": "localhost",
                    "port": 6379
                },
                "app": {
                    "debug": True,
                    "port": 8000,
                    "workers": 4
                },
                "logging": {
                    "level": "INFO",
                    "file": "app.log"
                }
            }
            self._config = default_config
            print("配置加载成功")
        except Exception as e:
            print(f"配置加载失败: {e}")
            self._config = {}
    
    def get(self, key_path: str, default=None):
        """获取配置值,支持点分隔的路径"""
        keys = key_path.split('.')
        value = self._config
        
        try:
            for key in keys:
                value = value[key]
            return value
        except (KeyError, TypeError):
            return default
    
    def set(self, key_path: str, value: Any):
        """设置配置值"""
        keys = key_path.split('.')
        config = self._config
        
        # 导航到目标位置的父级
        for key in keys[:-1]:
            if key not in config:
                config[key] = {}
            config = config[key]
        
        # 设置值
        old_value = config.get(keys[-1])
        config[keys[-1]] = value
        
        print(f"配置已更新: {key_path} = {value}")
        
        # 通知观察者
        self._notify_observers(key_path, old_value, value)
    
    def add_observer(self, callback):
        """添加配置变更观察者"""
        self._observers.append(callback)
    
    def _notify_observers(self, key_path: str, old_value, new_value):
        """通知所有观察者配置已变更"""
        for observer in self._observers:
            try:
                observer(key_path, old_value, new_value)
            except Exception as e:
                print(f"观察者回调出错: {e}")
    
    def reload(self):
        """重新加载配置"""
        self._load_config()
        print("配置已重新加载")
    
    def get_all(self) -> Dict[str, Any]:
        """获取所有配置(返回副本)"""
        return json.loads(json.dumps(self._config))  # 深拷贝

# 配置变更观察者示例
def log_config_change(key_path: str, old_value, new_value):
    """记录配置变更的观察者"""
    print(f"🔔 配置变更通知: {key_path}")
    print(f"   旧值: {old_value} -> 新值: {new_value}")

def validate_database_config(key_path: str, old_value, new_value):
    """验证数据库配置的观察者"""
    if key_path.startswith("database"):
        if key_path.endswith("port") and not (1 <= new_value <= 65535):
            print(f"⚠️  警告: 数据库端口 {new_value} 超出有效范围")

# 使用示例
print("=== 配置管理器应用案例 ===")

# 获取配置管理器实例(单例)
config1 = AppConfig()
config2 = AppConfig("other_config.json")  # 不会重新初始化

print(f"是否为同一实例: {config1 is config2}")

# 获取配置值
print(f"数据库主机: {config1.get('database.host')}")
print(f"应用调试模式: {config1.get('app.debug')}")
print(f"日志级别: {config1.get('logging.level')}")
print(f"不存在的配置: {config1.get('nonexistent.key', '默认值')}")

# 添加观察者
config1.add_observer(log_config_change)
config1.add_observer(validate_database_config)

# 更新配置
config1.set("app.debug", False)
config1.set("database.port", 3306)
config1.set("app.new_feature", True)

# 验证单例状态
print(f"\nconfig2中的调试模式: {config2.get('app.debug')}")  # False,证明是同一个实例
print(f"所有配置: {config1.get_all()}")

案例2:数据库连接池管理器

import threading
import time
from typing import List, Optional
from contextlib import contextmanager

class DatabaseConnection:
    """数据库连接类"""
    
    def __init__(self, connection_id: int, db_type: str = "sqlite"):
        self.connection_id = connection_id
        self.db_type = db_type
        self.is_active = True
        self.last_used = time.time()
        print(f"创建数据库连接 {connection_id} ({db_type})")
    
    def execute(self, query: str):
        """执行查询"""
        self.last_used = time.time()
        print(f"连接 {self.connection_id} 执行查询: {query}")
        return f"结果_{self.connection_id}"
    
    def close(self):
        """关闭连接"""
        self.is_active = False
        print(f"连接 {self.connection_id} 已关闭")

class ConnectionPool:
    """数据库连接池 - 单例模式确保全局唯一的连接池"""
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls, max_connections: int = 10):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, max_connections: int = 10):
        if not hasattr(self, '_initialized'):
            self.max_connections = max_connections
            self._available_connections: List[DatabaseConnection] = []
            self._active_connections: List[DatabaseConnection] = []
            self._connection_counter = 0
            self._lock = threading.Lock()
            self._initialized = True
            self._initialize_pool()
            print(f"连接池初始化完成,最大连接数: {max_connections}")
    
    def _initialize_pool(self):
        """初始化连接池"""
        for i in range(min(3, self.max_connections)):  # 预创建3个连接
            self._create_connection()
    
    def _create_connection(self) -> DatabaseConnection:
        """创建新连接"""
        self._connection_counter += 1
        conn = DatabaseConnection(self._connection_counter)
        self._available_connections.append(conn)
        return conn
    
    def get_connection(self, timeout: int = 30) -> Optional[DatabaseConnection]:
        """获取数据库连接"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            with self._lock:
                # 如果有可用连接,直接返回
                if self._available_connections:
                    conn = self._available_connections.pop(0)
                    self._active_connections.append(conn)
                    print(f"分配连接 {conn.connection_id},活跃连接数: {len(self._active_connections)}")
                    return conn
                
                # 如果没有可用连接但未达到上限,创建新连接
                if len(self._active_connections) < self.max_connections:
                    conn = self._create_connection()
                    self._active_connections.append(conn)
                    print(f"创建并分配连接 {conn.connection_id},活跃连接数: {len(self._active_connections)}")
                    return conn
            
            # 等待一段时间后重试
            time.sleep(0.1)
        
        print("获取连接超时")
        return None
    
    def return_connection(self, conn: DatabaseConnection):
        """归还数据库连接"""
        with self._lock:
            if conn in self._active_connections:
                self._active_connections.remove(conn)
                if conn.is_active:
                    self._available_connections.append(conn)
                    print(f"归还连接 {conn.connection_id},可用连接数: {len(self._available_connections)}")
                else:
                    print(f"连接 {conn.connection_id} 已失效,不重新加入池中")
            else:
                print(f"连接 {conn.connection_id} 不属于此连接池")
    
    def get_pool_stats(self) -> dict:
        """获取连接池统计信息"""
        with self._lock:
            return {
                "max_connections": self.max_connections,
                "available_connections": len(self._available_connections),
                "active_connections": len(self._active_connections),
                "total_created": self._connection_counter
            }
    
    def cleanup_idle_connections(self, idle_timeout: int = 300):
        """清理空闲连接"""
        current_time = time.time()
        with self._lock:
            # 找出空闲超时的连接
            idle_connections = [
                conn for conn in self._available_connections
                if current_time - conn.last_used > idle_timeout
            ]
            
            # 移除空闲连接
            for conn in idle_connections:
                self._available_connections.remove(conn)
                conn.close()
                print(f"清理空闲连接 {conn.connection_id}")

# 上下文管理器,自动获取和归还连接
@contextmanager
def get_db_connection(pool: ConnectionPool):
    """数据库连接上下文管理器"""
    conn = pool.get_connection()
    try:
        yield conn
    finally:
        if conn:
            pool.return_connection(conn)

# 使用示例
print("\n=== 数据库连接池应用案例 ===")

# 获取连接池实例(单例)
pool1 = ConnectionPool(max_connections=5)
pool2 = ConnectionPool(max_connections=10)  # 不会重新初始化

print(f"是否为同一连接池实例: {pool1 is pool2}")
print(f"连接池统计: {pool1.get_pool_stats()}")

# 模拟多线程使用连接池
def worker(worker_id: int, pool: ConnectionPool):
    """工作线程函数"""
    print(f"工作者 {worker_id} 开始工作")
    
    # 使用上下文管理器自动管理连接
    with get_db_connection(pool) as conn:
        if conn:
            result = conn.execute(f"SELECT * FROM table_{worker_id}")
            print(f"工作者 {worker_id} 获得结果: {result}")
            time.sleep(0.5)  # 模拟查询耗时
    
    print(f"工作者 {worker_id} 完成工作")

# 创建多个工作线程
threads = []
for i in range(3):
    thread = threading.Thread(target=worker, args=(i, pool1))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

print(f"\n最终连接池统计: {pool1.get_pool_stats()}")

单例模式的优缺点

优点

  1. 节省资源:避免频繁创建销毁重量级对象
  2. 全局访问:提供统一的访问点,方便管理
  3. 数据共享:多个模块可以共享同一状态
  4. 控制访问:限制实例数量,统一控制资源访问

缺点

  1. 难以测试:单例的全局状态可能导致测试间的相互影响
  2. 隐藏依赖:单例的使用可能隐藏类之间的依赖关系
  3. 线程安全复杂性:在多线程环境下需要特殊处理
  4. 违反单一职责:单例类既负责业务逻辑又负责实例控制
  5. 扩展性差:单例模式不利于继承和多态

常见错误

错误1:忘记防止重复初始化

class BadSingleton:
    """有问题的单例实现 - 会导致重复初始化"""
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, value):
        # 错误:每次实例化都会执行,导致重复初始化
        self.value = value
        print(f"BadSingleton.__init__ 被调用,value={value}")

# 测试
bad1 = BadSingleton("第一次")
bad2 = BadSingleton("第二次")
print(f"bad1.value: {bad1.value}")  # 第二次
print(f"bad2.value: {bad2.value}")  # 第二次
# 两个问题:1) 重复初始化 2) 值被覆盖

错误2:线程不安全

import threading

class UnsafeCounter:
    """线程不安全的计数器单例"""
    _instance = None
    _count = 0  # 共享状态
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def increment(self):
        """增加计数 - 线程不安全"""
        # 非原子操作,可能导致竞态条件
        self._count += 1
        return self._count

# 测试线程安全问题
def increment_counter():
    counter = UnsafeCounter()
    for _ in range(1000):
        counter.increment()

threads = [threading.Thread(target=increment_counter) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()

print(f"期望计数: 10000, 实际计数: {UnsafeCounter()._count}")
# 实际计数可能小于10000,因为存在竞态条件

错误3:过度使用单例

# 错误的做法:到处使用单例
class UserService:
    _instance = None
    # ... 单例实现
    
class ProductService:
    _instance = None
    # ... 单例实现
    
class OrderService:
    _instance = None
    # ... 单例实现

# 这样会导致:
# 1. 系统难以测试和维护
# 2. 模块间紧耦合
# 3. 违反单一职责原则

# 正确的做法是:只在真正需要全局唯一性的场景下使用单例

课后练习

  1. 创建一个Logger单例类,支持不同日志级别(DEBUG、INFO、WARNING、ERROR),并能将日志写入文件和控制台。

  2. 设计一个CacheManager单例类,支持内存缓存和过期时间管理,提供get、set、delete等基本缓存操作。

  3. 实现一个SessionManager单例类,用于Web应用的会话管理,支持会话的创建、查找、销毁和过期清理。

  4. 创建一个MetricsCollector单例类,用于收集和统计应用程序的性能指标(如请求次数、响应时间等)。

  5. 设计一个FeatureToggle单例类,用于管理应用程序的功能开关,支持动态启用/禁用功能特性。

总结

单例模式是重要的创建型设计模式:

  • 确保类只有一个实例,并提供全局访问点
  • Python中有多种实现方法:重写__new__、装饰器、元类、模块
  • 需要考虑线程安全问题,特别是在多线程环境下
  • 适用于配置管理、资源管理、日志记录等场景
  • 要注意防止滥用,过度使用会导致代码难以测试和维护
  • 合理使用单例模式可以提高资源利用率和系统一致性
« 上一篇 数据类 下一篇 » 工厂模式