第128集_SQLAlchemy基础
1. SQLAlchemy简介
SQLAlchemy是Python中最流行的ORM(对象关系映射)框架之一,它提供了强大的数据库操作能力,同时支持多种数据库后端(如SQLite、MySQL、PostgreSQL等)。SQLAlchemy的设计理念是"数据库无关性",允许开发者使用相同的代码操作不同类型的数据库。
SQLAlchemy的主要特点
- 分层架构:分为Core(核心)和ORM(对象关系映射)两层
- SQL表达式语言:提供了一种灵活的方式来构建SQL查询
- ORM支持:将Python类映射到数据库表
- 事务管理:完整的ACID事务支持
- 连接池:自动管理数据库连接
- 跨数据库兼容:支持多种主流数据库
2. SQLAlchemy安装
使用pip安装SQLAlchemy:
pip install sqlalchemy如果需要连接特定数据库,还需要安装对应的数据库驱动:
- MySQL:
pip install mysql-connector-python或pip install pymysql - PostgreSQL:
pip install psycopg2或pip install psycopg2-binary - SQLite: 无需额外安装(Python标准库支持)
3. SQLAlchemy核心架构
SQLAlchemy采用分层架构设计,主要包括以下几个核心组件:
3.1 Engine(引擎)
Engine是SQLAlchemy的核心组件,负责管理数据库连接池和SQL执行。它是与数据库交互的入口点。
from sqlalchemy import create_engine
# 创建SQLite引擎(内存数据库)
engine = create_engine('sqlite:///:memory:')
# 创建MySQL引擎
engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/database')
# 创建PostgreSQL引擎
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/database')3.2 Connection(连接)
Connection代表与数据库的实际连接,用于执行SQL语句。
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
for row in result:
print(row)3.3 Transaction(事务)
Transaction管理数据库事务,确保ACID属性。
with engine.connect() as conn:
trans = conn.begin()
try:
conn.execute("INSERT INTO users (name) VALUES ('Alice')")
conn.execute("INSERT INTO users (name) VALUES ('Bob')")
trans.commit()
except:
trans.rollback()
raise3.4 Metadata(元数据)
Metadata用于定义数据库表结构,包括表、列、约束等。
from sqlalchemy import MetaData, Table, Column, Integer, String
metadata = MetaData()
users = Table(
'users',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('email', String(100))
)
# 创建表
metadata.create_all(engine)4. SQLAlchemy Core API
SQLAlchemy Core提供了SQL表达式语言,允许开发者以Pythonic的方式构建SQL查询。
4.1 创建表
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float
# 创建引擎
engine = create_engine('sqlite:///sqlalchemy_demo.db')
# 创建元数据
metadata = MetaData()
# 定义表
products = Table(
'products',
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), nullable=False),
Column('price', Float, nullable=False),
Column('category', String(50))
)
# 创建表
metadata.create_all(engine)4.2 插入数据
from sqlalchemy import insert
# 插入单条数据
with engine.connect() as conn:
stmt = insert(products).values(
name='iPhone 14',
price=999.99,
category='Electronics'
)
result = conn.execute(stmt)
conn.commit()
print(f"插入了 {result.rowcount} 条数据,ID: {result.inserted_primary_key[0]}")
# 插入多条数据
with engine.connect() as conn:
stmt = insert(products).values([
{'name': 'MacBook Pro', 'price': 1999.99, 'category': 'Electronics'},
{'name': 'Nike Shoes', 'price': 129.99, 'category': 'Clothing'},
{'name': 'Coffee Mug', 'price': 14.99, 'category': 'Home'}
])
result = conn.execute(stmt)
conn.commit()
print(f"插入了 {result.rowcount} 条数据")4.3 查询数据
from sqlalchemy import select
# 查询所有数据
with engine.connect() as conn:
stmt = select(products)
result = conn.execute(stmt)
for row in result:
print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")
# 条件查询
with engine.connect() as conn:
stmt = select(products).where(products.c.price > 100)
result = conn.execute(stmt)
for row in result:
print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")
# 排序查询
with engine.connect() as conn:
stmt = select(products).order_by(products.c.price.desc())
result = conn.execute(stmt)
for row in result:
print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")
# 聚合查询
with engine.connect() as conn:
stmt = select(products.c.category, func.count(products.c.id).label('count'))
stmt = stmt.group_by(products.c.category)
result = conn.execute(stmt)
for row in result:
print(f"Category: {row.category}, Count: {row.count}")4.4 更新数据
from sqlalchemy import update
with engine.connect() as conn:
stmt = update(products).where(products.c.name == 'iPhone 14').values(price=899.99)
result = conn.execute(stmt)
conn.commit()
print(f"更新了 {result.rowcount} 条数据")4.5 删除数据
from sqlalchemy import delete
with engine.connect() as conn:
stmt = delete(products).where(products.c.name == 'Coffee Mug')
result = conn.execute(stmt)
conn.commit()
print(f"删除了 {result.rowcount} 条数据")5. SQLAlchemy ORM API
SQLAlchemy ORM允许开发者将Python类映射到数据库表,使用面向对象的方式操作数据库。
5.1 定义模型
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import declarative_base
# 创建引擎
engine = create_engine('sqlite:///sqlalchemy_orm_demo.db')
# 创建基类
Base = declarative_base()
# 定义模型类
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
# 创建表
Base.metadata.create_all(engine)5.2 创建会话
会话是ORM操作的核心,用于管理对象的生命周期。
from sqlalchemy.orm import sessionmaker
# 创建会话工厂
Session = sessionmaker(bind=engine)
# 创建会话
session = Session()5.3 插入数据
# 创建用户对象
user1 = User(username='alice', email='alice@example.com', age=25)
user2 = User(username='bob', email='bob@example.com', age=30)
# 添加到会话
session.add(user1)
session.add(user2)
# 提交事务
session.commit()
# 添加多个用户
users = [
User(username='charlie', email='charlie@example.com', age=35),
User(username='david', email='david@example.com', age=40)
]
session.add_all(users)
session.commit()5.4 查询数据
# 查询所有用户
users = session.query(User).all()
for user in users:
print(user)
# 条件查询
users = session.query(User).filter(User.age > 30).all()
for user in users:
print(user)
# 排序查询
users = session.query(User).order_by(User.age.desc()).all()
for user in users:
print(user)
# 限制查询
users = session.query(User).limit(2).all()
for user in users:
print(user)
# 单条查询
user = session.query(User).filter(User.username == 'alice').first()
print(user)
# 统计查询
count = session.query(User).count()
print(f"总用户数: {count}")5.5 更新数据
# 查询用户
user = session.query(User).filter(User.username == 'alice').first()
# 更新数据
user.age = 26
# 提交事务
session.commit()
print(user)
# 批量更新
session.query(User).filter(User.age < 30).update({'age': User.age + 1})
session.commit()5.6 删除数据
# 查询用户
user = session.query(User).filter(User.username == 'bob').first()
# 删除用户
session.delete(user)
# 提交事务
session.commit()
# 批量删除
session.query(User).filter(User.age > 35).delete()
session.commit()6. 事务管理
SQLAlchemy提供了完整的事务支持,确保数据操作的原子性、一致性、隔离性和持久性。
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
# 使用with语句自动管理事务
with Session() as session:
try:
# 创建用户
user1 = User(username='transaction_user1', email='user1@example.com', age=25)
user2 = User(username='transaction_user2', email='user2@example.com', age=30)
# 添加到会话
session.add(user1)
session.add(user2)
# 提交事务
session.commit()
print("事务提交成功")
except Exception as e:
# 回滚事务
session.rollback()
print(f"事务失败,已回滚: {e}")7. 错误处理
SQLAlchemy提供了多种异常类型,用于处理不同类型的数据库错误。
from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError
# 处理唯一约束错误
try:
user = User(username='alice', email='alice@example.com', age=25)
session.add(user)
session.commit()
except IntegrityError as e:
session.rollback()
print(f"唯一约束错误: {e}")
# 处理连接错误
try:
# 尝试连接不存在的数据库
engine = create_engine('mysql+mysqlconnector://wrong_user:wrong_pass@localhost:3306/wrong_db')
conn = engine.connect()
except OperationalError as e:
print(f"连接错误: {e}")
# 处理SQL语法错误
try:
with engine.connect() as conn:
conn.execute("SELECT * FROM non_existent_table")
except ProgrammingError as e:
print(f"SQL语法错误: {e}")8. SQLAlchemy最佳实践
- 使用会话管理器:总是使用
sessionmaker创建会话 - 自动提交和回滚:使用
with语句自动管理事务 - 批量操作:使用
add_all()进行批量插入 - 索引优化:为经常查询的列添加索引
- 延迟加载:根据需要使用延迟加载或预加载
- 错误处理:总是捕获并处理数据库异常
- 连接池配置:根据应用需求配置连接池
9. 总结
本集我们学习了SQLAlchemy的基础知识,包括:
- SQLAlchemy的核心架构
- Core API的使用(创建表、CRUD操作)
- ORM API的使用(定义模型、会话管理、CRUD操作)
- 事务管理和错误处理
SQLAlchemy是一个功能强大的ORM框架,它提供了灵活的数据库操作方式,适合各种规模的应用开发。在下一集中,我们将学习SQLAlchemy的高级用法,包括关系映射、高级查询等。