第68集:单例模式
学习目标
- 理解单例模式的概念和作用
- 掌握Python中实现单例模式的多种方法
- 学会使用单例模式解决实际问题
- 了解单例模式的优缺点和适用场景
- 掌握线程安全的单例模式实现
单例模式概念
什么是单例模式
单例模式(Singleton Pattern)是一种创建型设计模式,确保一个类只有一个实例,并提供一个全局访问点来获取该实例。
为什么使用单例模式
- 资源共享:多个模块共享同一个资源(如数据库连接、配置管理器)
- 控制访问:限制某些资源的访问权限,确保统一控制
- 节省开销:避免频繁创建和销毁重量级对象
- 全局状态:维护全局状态或计数器
- 配置管理:统一管理应用程序配置
单例模式的核心思想
- 私有化构造方法:防止外部直接实例化
- 静态访问方法:提供统一的实例获取入口
- 实例缓存:在类内部维护唯一实例的引用
单例模式实现方法
方法1:重写__new__方法(最常用)
基本实现
class Singleton:
"""单例类 - 通过重写__new__方法实现"""
_instance = None # 类变量,存储唯一实例
def __new__(cls, *args, **kwargs):
"""重写__new__方法控制实例创建"""
if cls._instance is None:
# 如果还没有实例,创建一个
cls._instance = super().__new__(cls)
# 返回已存在的实例
return cls._instance
def __init__(self, name=None):
"""初始化方法 - 注意可能被多次调用"""
# 需要判断是否已经初始化过,避免重复初始化
if not hasattr(self, '_initialized'):
self.name = name or "默认单例"
self._initialized = True
print(f"Singleton初始化: {self.name}")
# 测试单例模式
singleton1 = Singleton("第一个实例")
singleton2 = Singleton("第二个实例")
print(f"singleton1 ID: {id(singleton1)}")
print(f"singleton2 ID: {id(singleton2)}")
print(f"是否为同一实例: {singleton1 is singleton2}")
print(f"singleton1.name: {singleton1.name}") # 注意:name会是"第二个实例"
print(f"singleton2.name: {singleton2.name}")改进版本:防止重复初始化
class ImprovedSingleton:
"""改进的单例类 - 防止重复初始化"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, config=None):
"""改进的初始化方法"""
# 使用特殊属性标记是否已初始化
if not hasattr(self, '_is_initialized'):
self.config = config or {}
self._is_initialized = True
self._setup()
print(f"ImprovedSingleton初始化完成,配置: {self.config}")
def _setup(self):
"""实际的初始化逻辑"""
self.start_time = "2024-01-01"
self.version = "1.0.0"
def update_config(self, new_config):
"""更新配置的方法"""
self.config.update(new_config)
print(f"配置已更新: {self.config}")
# 测试改进版本
single1 = ImprovedSingleton({"database": "mysql"})
single2 = ImprovedSingleton({"cache": "redis"}) # 不会重新初始化
print(f"同一实例: {single1 is single2}")
print(f"最终配置: {single1.config}") # 包含两个配置方法2:使用装饰器实现单例
单例装饰器
def singleton_decorator(cls):
"""单例装饰器 - 将任何类转换为单例"""
instances = {} # 存储各个类的实例
def wrapper(*args, **kwargs):
# 如果该类还没有实例,创建一个
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return wrapper
# 使用装饰器创建单例类
@singleton_decorator
class DatabaseConnection:
"""数据库连接类 - 使用装饰器实现单例"""
def __init__(self, connection_string="default_db"):
self.connection_string = connection_string
self.is_connected = False
print(f"创建数据库连接: {connection_string}")
def connect(self):
"""连接数据库"""
self.is_connected = True
print(f"连接到数据库: {self.connection_string}")
def disconnect(self):
"""断开连接"""
self.is_connected = False
print("数据库连接已断开")
@singleton_decorator
class Logger:
"""日志记录器 - 另一个单例类"""
def __init__(self, log_file="app.log"):
self.log_file = log_file
self.messages = []
print(f"创建日志记录器: {log_file}")
def log(self, message):
"""记录日志"""
timestamp = "2024-01-01 10:00:00" # 简化时间处理
log_entry = f"[{timestamp}] {message}"
self.messages.append(log_entry)
print(f"记录日志: {log_entry}")
# 测试装饰器单例
print("=== 测试装饰器单例模式 ===")
db1 = DatabaseConnection("mysql://localhost:3306/mydb")
db2 = DatabaseConnection("postgresql://localhost:5432/otherdb") # 不会创建新实例
print(f"数据库连接是否为同一实例: {db1 is db2}")
print(f"连接字符串: {db1.connection_string}") # 仍然是第一个字符串
db1.connect()
print(f"db2连接状态: {db2.is_connected}") # True,因为是同一个实例
# 测试多个单例类
logger1 = Logger("app1.log")
logger2 = Logger("app2.log")
print(f"日志记录器是否为同一实例: {logger1 is logger2}")
print(f"日志文件: {logger1.log_file}") # app1.log
logger1.log("这是一条测试日志")
print(f"logger2的消息数量: {len(logger2.messages)}") # 1,共享消息列表方法3:使用元类实现单例
单例元类
class SingletonMeta(type):
"""单例元类 - 通过元类控制类的实例化"""
def __init__(cls, name, bases, attrs):
"""元类的初始化"""
super().__init__(name, bases, attrs)
cls._instance = None # 为每个类创建独立的实例存储
def __call__(cls, *args, **kwargs):
"""控制类的实例化过程"""
if cls._instance is None:
# 创建实例
cls._instance = super().__call__(*args, **kwargs)
return cls._instance
class ConfigManager(metaclass=SingletonMeta):
"""配置管理器 - 使用元类实现单例"""
def __init__(self, config_file="config.json"):
self.config_file = config_file
self.settings = {}
self._load_config()
print(f"配置管理器初始化,加载文件: {config_file}")
def _load_config(self):
"""加载配置文件(模拟)"""
# 模拟从文件加载配置
self.settings = {
"database_url": "sqlite:///app.db",
"debug": True,
"max_connections": 100
}
def get(self, key, default=None):
"""获取配置值"""
return self.settings.get(key, default)
def set(self, key, value):
"""设置配置值"""
self.settings[key] = value
print(f"配置已更新: {key} = {value}")
def reload(self):
"""重新加载配置"""
self._load_config()
print("配置已重新加载")
class CacheManager(metaclass=SingletonMeta):
"""缓存管理器 - 另一个使用元类的单例"""
def __init__(self, cache_type="memory"):
self.cache_type = cache_type
self._cache = {}
print(f"缓存管理器初始化,类型: {cache_type}")
def set(self, key, value):
"""设置缓存"""
self._cache[key] = value
print(f"缓存设置: {key} = {value}")
def get(self, key):
"""获取缓存"""
return self._cache.get(key)
def clear(self):
"""清空缓存"""
self._cache.clear()
print("缓存已清空")
# 测试元类单例
print("=== 测试元类单例模式 ===")
config1 = ConfigManager("app_config.json")
config2 = ConfigManager("other_config.json") # 不会重新初始化
print(f"配置管理器是否为同一实例: {config1 is config2}")
print(f"配置文件: {config1.config_file}") # app_config.json
config1.set("timeout", 30)
print(f"config2中的timeout: {config2.get('timeout')}") # 30,共享状态
# 测试多个单例类
cache1 = CacheManager("redis")
cache2 = CacheManager("memcached")
print(f"缓存管理器是否为同一实例: {cache1 is cache2}")
print(f"缓存类型: {cache1.cache_type}") # redis
cache1.set("user_123", {"name": "张三", "age": 25})
print(f"cache2中的用户数据: {cache2.get('user_123')}") # 共享缓存数据方法4:使用模块实现单例(Python特有)
模块级别的单例
# config_module.py
"""配置模块 - Python模块天然是单例的"""
# 模块级别的变量 - 天然单例
_config = {
"database_url": "sqlite:///default.db",
"debug": False,
"version": "1.0.0"
}
# 模块级别的函数
def get_config(key, default=None):
"""获取配置"""
return _config.get(key, default)
def set_config(key, value):
"""设置配置"""
_config[key] = value
print(f"配置已更新: {key} = {value}")
def get_all_config():
"""获取所有配置"""
return _config.copy()
# 初始化代码(模块加载时执行)
print("配置模块已加载,初始配置:", _config)
# 在同一个目录下的使用示例
# main.py
import config_module
print("=== 测试模块级单例 ===")
print("第一次访问配置:", config_module.get_all_config())
config_module.set_config("debug", True)
config_module.set_config("max_users", 1000)
print("第二次访问配置:", config_module.get_all_config())
# 在不同的"导入"中也是同一个实例
import config_module as cm
print(f"不同导入的模块是否相同: {config_module is cm}")
print(f"配置是否共享: {cm.get_config('debug')}") # True线程安全的单例模式
多线程环境下的单例问题
import threading
import time
class ThreadUnsafeSingleton:
"""非线程安全的单例类 - 用于演示问题"""
_instance = None
def __new__(cls):
if cls._instance is None:
# 模拟耗时操作,增加线程冲突概率
time.sleep(0.01)
cls._instance = super().__new__(cls)
return cls._instance
# 测试线程安全问题
def create_singleton(instance_id):
"""创建单例实例的函数"""
singleton = ThreadUnsafeSingleton()
print(f"线程{instance_id}创建的实例ID: {id(singleton)}")
return singleton
print("=== 测试非线程安全单例 ===")
threads = []
instances = []
# 创建多个线程同时创建单例
for i in range(5):
thread = threading.Thread(target=lambda i=i: instances.append(create_singleton(i)))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 检查是否创建了多个实例
unique_ids = set(id(instance) for instance in instances)
print(f"创建了 {len(instances)} 个实例,但有 {len(unique_ids)} 个不同的ID")
if len(unique_ids) > 1:
print("⚠️ 存在线程安全问题!创建了多个实例")线程安全的单例实现
使用线程锁
import threading
import time
class ThreadSafeSingleton:
"""线程安全的单例类 - 使用锁机制"""
_instance = None
_lock = threading.Lock() # 线程锁
def __new__(cls, *args, **kwargs):
# 双重检查锁定模式(Double-Checked Locking)
if cls._instance is None: # 第一次检查(无锁,提高性能)
with cls._lock: # 加锁
if cls._instance is None: # 第二次检查(有锁,确保线程安全)
time.sleep(0.01) # 模拟耗时操作
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, name="ThreadSafeSingleton"):
if not hasattr(self, '_initialized'):
self.name = name
self._initialized = True
print(f"ThreadSafeSingleton初始化: {name}")
def create_thread_safe_singleton(instance_id):
"""创建线程安全单例的函数"""
singleton = ThreadSafeSingleton(f"实例{instance_id}")
print(f"线程{instance_id}创建的实例ID: {id(singleton)}")
return singleton
print("\n=== 测试线程安全单例 ===")
threads = []
instances = []
# 创建多个线程同时创建单例
for i in range(5):
thread = threading.Thread(target=lambda i=i: instances.append(create_thread_safe_singleton(i)))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 检查是否确保了单例
unique_ids = set(id(instance) for instance in instances)
print(f"创建了 {len(instances)} 个实例,但有 {len(unique_ids)} 个不同的ID")
if len(unique_ids) == 1:
print("✅ 线程安全!始终只创建一个实例")
else:
print("❌ 仍然存在线程安全问题")使用__new__方法的简化线程安全版本
import threading
class SimpleThreadSafeSingleton:
"""简化的线程安全单例"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock: # 简单粗暴的线程安全
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance单例模式应用案例
案例1:应用程序配置管理器
import json
from typing import Dict, Any
import threading
class AppConfig:
"""应用程序配置管理器 - 单例模式的实际应用"""
_instance = None
_lock = threading.Lock()
def __new__(cls, config_file="app_config.json"):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, config_file="app_config.json"):
# 防止重复初始化
if not hasattr(self, '_initialized'):
self.config_file = config_file
self._config = {}
self._observers = [] # 配置变更观察者
self._load_config()
self._initialized = True
print(f"配置管理器初始化完成,配置文件: {config_file}")
def _load_config(self):
"""加载配置文件"""
try:
# 模拟从JSON文件加载配置
default_config = {
"database": {
"host": "localhost",
"port": 5432,
"name": "myapp"
},
"redis": {
"host": "localhost",
"port": 6379
},
"app": {
"debug": True,
"port": 8000,
"workers": 4
},
"logging": {
"level": "INFO",
"file": "app.log"
}
}
self._config = default_config
print("配置加载成功")
except Exception as e:
print(f"配置加载失败: {e}")
self._config = {}
def get(self, key_path: str, default=None):
"""获取配置值,支持点分隔的路径"""
keys = key_path.split('.')
value = self._config
try:
for key in keys:
value = value[key]
return value
except (KeyError, TypeError):
return default
def set(self, key_path: str, value: Any):
"""设置配置值"""
keys = key_path.split('.')
config = self._config
# 导航到目标位置的父级
for key in keys[:-1]:
if key not in config:
config[key] = {}
config = config[key]
# 设置值
old_value = config.get(keys[-1])
config[keys[-1]] = value
print(f"配置已更新: {key_path} = {value}")
# 通知观察者
self._notify_observers(key_path, old_value, value)
def add_observer(self, callback):
"""添加配置变更观察者"""
self._observers.append(callback)
def _notify_observers(self, key_path: str, old_value, new_value):
"""通知所有观察者配置已变更"""
for observer in self._observers:
try:
observer(key_path, old_value, new_value)
except Exception as e:
print(f"观察者回调出错: {e}")
def reload(self):
"""重新加载配置"""
self._load_config()
print("配置已重新加载")
def get_all(self) -> Dict[str, Any]:
"""获取所有配置(返回副本)"""
return json.loads(json.dumps(self._config)) # 深拷贝
# 配置变更观察者示例
def log_config_change(key_path: str, old_value, new_value):
"""记录配置变更的观察者"""
print(f"🔔 配置变更通知: {key_path}")
print(f" 旧值: {old_value} -> 新值: {new_value}")
def validate_database_config(key_path: str, old_value, new_value):
"""验证数据库配置的观察者"""
if key_path.startswith("database"):
if key_path.endswith("port") and not (1 <= new_value <= 65535):
print(f"⚠️ 警告: 数据库端口 {new_value} 超出有效范围")
# 使用示例
print("=== 配置管理器应用案例 ===")
# 获取配置管理器实例(单例)
config1 = AppConfig()
config2 = AppConfig("other_config.json") # 不会重新初始化
print(f"是否为同一实例: {config1 is config2}")
# 获取配置值
print(f"数据库主机: {config1.get('database.host')}")
print(f"应用调试模式: {config1.get('app.debug')}")
print(f"日志级别: {config1.get('logging.level')}")
print(f"不存在的配置: {config1.get('nonexistent.key', '默认值')}")
# 添加观察者
config1.add_observer(log_config_change)
config1.add_observer(validate_database_config)
# 更新配置
config1.set("app.debug", False)
config1.set("database.port", 3306)
config1.set("app.new_feature", True)
# 验证单例状态
print(f"\nconfig2中的调试模式: {config2.get('app.debug')}") # False,证明是同一个实例
print(f"所有配置: {config1.get_all()}")案例2:数据库连接池管理器
import threading
import time
from typing import List, Optional
from contextlib import contextmanager
class DatabaseConnection:
"""数据库连接类"""
def __init__(self, connection_id: int, db_type: str = "sqlite"):
self.connection_id = connection_id
self.db_type = db_type
self.is_active = True
self.last_used = time.time()
print(f"创建数据库连接 {connection_id} ({db_type})")
def execute(self, query: str):
"""执行查询"""
self.last_used = time.time()
print(f"连接 {self.connection_id} 执行查询: {query}")
return f"结果_{self.connection_id}"
def close(self):
"""关闭连接"""
self.is_active = False
print(f"连接 {self.connection_id} 已关闭")
class ConnectionPool:
"""数据库连接池 - 单例模式确保全局唯一的连接池"""
_instance = None
_lock = threading.Lock()
def __new__(cls, max_connections: int = 10):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, max_connections: int = 10):
if not hasattr(self, '_initialized'):
self.max_connections = max_connections
self._available_connections: List[DatabaseConnection] = []
self._active_connections: List[DatabaseConnection] = []
self._connection_counter = 0
self._lock = threading.Lock()
self._initialized = True
self._initialize_pool()
print(f"连接池初始化完成,最大连接数: {max_connections}")
def _initialize_pool(self):
"""初始化连接池"""
for i in range(min(3, self.max_connections)): # 预创建3个连接
self._create_connection()
def _create_connection(self) -> DatabaseConnection:
"""创建新连接"""
self._connection_counter += 1
conn = DatabaseConnection(self._connection_counter)
self._available_connections.append(conn)
return conn
def get_connection(self, timeout: int = 30) -> Optional[DatabaseConnection]:
"""获取数据库连接"""
start_time = time.time()
while time.time() - start_time < timeout:
with self._lock:
# 如果有可用连接,直接返回
if self._available_connections:
conn = self._available_connections.pop(0)
self._active_connections.append(conn)
print(f"分配连接 {conn.connection_id},活跃连接数: {len(self._active_connections)}")
return conn
# 如果没有可用连接但未达到上限,创建新连接
if len(self._active_connections) < self.max_connections:
conn = self._create_connection()
self._active_connections.append(conn)
print(f"创建并分配连接 {conn.connection_id},活跃连接数: {len(self._active_connections)}")
return conn
# 等待一段时间后重试
time.sleep(0.1)
print("获取连接超时")
return None
def return_connection(self, conn: DatabaseConnection):
"""归还数据库连接"""
with self._lock:
if conn in self._active_connections:
self._active_connections.remove(conn)
if conn.is_active:
self._available_connections.append(conn)
print(f"归还连接 {conn.connection_id},可用连接数: {len(self._available_connections)}")
else:
print(f"连接 {conn.connection_id} 已失效,不重新加入池中")
else:
print(f"连接 {conn.connection_id} 不属于此连接池")
def get_pool_stats(self) -> dict:
"""获取连接池统计信息"""
with self._lock:
return {
"max_connections": self.max_connections,
"available_connections": len(self._available_connections),
"active_connections": len(self._active_connections),
"total_created": self._connection_counter
}
def cleanup_idle_connections(self, idle_timeout: int = 300):
"""清理空闲连接"""
current_time = time.time()
with self._lock:
# 找出空闲超时的连接
idle_connections = [
conn for conn in self._available_connections
if current_time - conn.last_used > idle_timeout
]
# 移除空闲连接
for conn in idle_connections:
self._available_connections.remove(conn)
conn.close()
print(f"清理空闲连接 {conn.connection_id}")
# 上下文管理器,自动获取和归还连接
@contextmanager
def get_db_connection(pool: ConnectionPool):
"""数据库连接上下文管理器"""
conn = pool.get_connection()
try:
yield conn
finally:
if conn:
pool.return_connection(conn)
# 使用示例
print("\n=== 数据库连接池应用案例 ===")
# 获取连接池实例(单例)
pool1 = ConnectionPool(max_connections=5)
pool2 = ConnectionPool(max_connections=10) # 不会重新初始化
print(f"是否为同一连接池实例: {pool1 is pool2}")
print(f"连接池统计: {pool1.get_pool_stats()}")
# 模拟多线程使用连接池
def worker(worker_id: int, pool: ConnectionPool):
"""工作线程函数"""
print(f"工作者 {worker_id} 开始工作")
# 使用上下文管理器自动管理连接
with get_db_connection(pool) as conn:
if conn:
result = conn.execute(f"SELECT * FROM table_{worker_id}")
print(f"工作者 {worker_id} 获得结果: {result}")
time.sleep(0.5) # 模拟查询耗时
print(f"工作者 {worker_id} 完成工作")
# 创建多个工作线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i, pool1))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print(f"\n最终连接池统计: {pool1.get_pool_stats()}")单例模式的优缺点
优点
- 节省资源:避免频繁创建销毁重量级对象
- 全局访问:提供统一的访问点,方便管理
- 数据共享:多个模块可以共享同一状态
- 控制访问:限制实例数量,统一控制资源访问
缺点
- 难以测试:单例的全局状态可能导致测试间的相互影响
- 隐藏依赖:单例的使用可能隐藏类之间的依赖关系
- 线程安全复杂性:在多线程环境下需要特殊处理
- 违反单一职责:单例类既负责业务逻辑又负责实例控制
- 扩展性差:单例模式不利于继承和多态
常见错误
错误1:忘记防止重复初始化
class BadSingleton:
"""有问题的单例实现 - 会导致重复初始化"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, value):
# 错误:每次实例化都会执行,导致重复初始化
self.value = value
print(f"BadSingleton.__init__ 被调用,value={value}")
# 测试
bad1 = BadSingleton("第一次")
bad2 = BadSingleton("第二次")
print(f"bad1.value: {bad1.value}") # 第二次
print(f"bad2.value: {bad2.value}") # 第二次
# 两个问题:1) 重复初始化 2) 值被覆盖错误2:线程不安全
import threading
class UnsafeCounter:
"""线程不安全的计数器单例"""
_instance = None
_count = 0 # 共享状态
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def increment(self):
"""增加计数 - 线程不安全"""
# 非原子操作,可能导致竞态条件
self._count += 1
return self._count
# 测试线程安全问题
def increment_counter():
counter = UnsafeCounter()
for _ in range(1000):
counter.increment()
threads = [threading.Thread(target=increment_counter) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(f"期望计数: 10000, 实际计数: {UnsafeCounter()._count}")
# 实际计数可能小于10000,因为存在竞态条件错误3:过度使用单例
# 错误的做法:到处使用单例
class UserService:
_instance = None
# ... 单例实现
class ProductService:
_instance = None
# ... 单例实现
class OrderService:
_instance = None
# ... 单例实现
# 这样会导致:
# 1. 系统难以测试和维护
# 2. 模块间紧耦合
# 3. 违反单一职责原则
# 正确的做法是:只在真正需要全局唯一性的场景下使用单例课后练习
创建一个
Logger单例类,支持不同日志级别(DEBUG、INFO、WARNING、ERROR),并能将日志写入文件和控制台。设计一个
CacheManager单例类,支持内存缓存和过期时间管理,提供get、set、delete等基本缓存操作。实现一个
SessionManager单例类,用于Web应用的会话管理,支持会话的创建、查找、销毁和过期清理。创建一个
MetricsCollector单例类,用于收集和统计应用程序的性能指标(如请求次数、响应时间等)。设计一个
FeatureToggle单例类,用于管理应用程序的功能开关,支持动态启用/禁用功能特性。
总结
单例模式是重要的创建型设计模式:
- 确保类只有一个实例,并提供全局访问点
- Python中有多种实现方法:重写__new__、装饰器、元类、模块
- 需要考虑线程安全问题,特别是在多线程环境下
- 适用于配置管理、资源管理、日志记录等场景
- 要注意防止滥用,过度使用会导致代码难以测试和维护
- 合理使用单例模式可以提高资源利用率和系统一致性