第128集_SQLAlchemy基础

1. SQLAlchemy简介

SQLAlchemy是Python中最流行的ORM(对象关系映射)框架之一,它提供了强大的数据库操作能力,同时支持多种数据库后端(如SQLite、MySQL、PostgreSQL等)。SQLAlchemy的设计理念是"数据库无关性",允许开发者使用相同的代码操作不同类型的数据库。

SQLAlchemy的主要特点

  • 分层架构:分为Core(核心)和ORM(对象关系映射)两层
  • SQL表达式语言:提供了一种灵活的方式来构建SQL查询
  • ORM支持:将Python类映射到数据库表
  • 事务管理:完整的ACID事务支持
  • 连接池:自动管理数据库连接
  • 跨数据库兼容:支持多种主流数据库

2. SQLAlchemy安装

使用pip安装SQLAlchemy:

pip install sqlalchemy

如果需要连接特定数据库,还需要安装对应的数据库驱动:

  • MySQL: pip install mysql-connector-pythonpip install pymysql
  • PostgreSQL: pip install psycopg2pip install psycopg2-binary
  • SQLite: 无需额外安装(Python标准库支持)

3. SQLAlchemy核心架构

SQLAlchemy采用分层架构设计,主要包括以下几个核心组件:

3.1 Engine(引擎)

Engine是SQLAlchemy的核心组件,负责管理数据库连接池和SQL执行。它是与数据库交互的入口点。

from sqlalchemy import create_engine

# 创建SQLite引擎(内存数据库)
engine = create_engine('sqlite:///:memory:')

# 创建MySQL引擎
engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/database')

# 创建PostgreSQL引擎
engine = create_engine('postgresql+psycopg2://username:password@localhost:5432/database')

3.2 Connection(连接)

Connection代表与数据库的实际连接,用于执行SQL语句。

with engine.connect() as conn:
    result = conn.execute("SELECT * FROM users")
    for row in result:
        print(row)

3.3 Transaction(事务)

Transaction管理数据库事务,确保ACID属性。

with engine.connect() as conn:
    trans = conn.begin()
    try:
        conn.execute("INSERT INTO users (name) VALUES ('Alice')")
        conn.execute("INSERT INTO users (name) VALUES ('Bob')")
        trans.commit()
    except:
        trans.rollback()
        raise

3.4 Metadata(元数据)

Metadata用于定义数据库表结构,包括表、列、约束等。

from sqlalchemy import MetaData, Table, Column, Integer, String

metadata = MetaData()

users = Table(
    'users',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('email', String(100))
)

# 创建表
metadata.create_all(engine)

4. SQLAlchemy Core API

SQLAlchemy Core提供了SQL表达式语言,允许开发者以Pythonic的方式构建SQL查询。

4.1 创建表

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float

# 创建引擎
engine = create_engine('sqlite:///sqlalchemy_demo.db')

# 创建元数据
metadata = MetaData()

# 定义表
products = Table(
    'products',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(100), nullable=False),
    Column('price', Float, nullable=False),
    Column('category', String(50))
)

# 创建表
metadata.create_all(engine)

4.2 插入数据

from sqlalchemy import insert

# 插入单条数据
with engine.connect() as conn:
    stmt = insert(products).values(
        name='iPhone 14',
        price=999.99,
        category='Electronics'
    )
    result = conn.execute(stmt)
    conn.commit()
    print(f"插入了 {result.rowcount} 条数据,ID: {result.inserted_primary_key[0]}")

# 插入多条数据
with engine.connect() as conn:
    stmt = insert(products).values([
        {'name': 'MacBook Pro', 'price': 1999.99, 'category': 'Electronics'},
        {'name': 'Nike Shoes', 'price': 129.99, 'category': 'Clothing'},
        {'name': 'Coffee Mug', 'price': 14.99, 'category': 'Home'}
    ])
    result = conn.execute(stmt)
    conn.commit()
    print(f"插入了 {result.rowcount} 条数据")

4.3 查询数据

from sqlalchemy import select

# 查询所有数据
with engine.connect() as conn:
    stmt = select(products)
    result = conn.execute(stmt)
    for row in result:
        print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")

# 条件查询
with engine.connect() as conn:
    stmt = select(products).where(products.c.price > 100)
    result = conn.execute(stmt)
    for row in result:
        print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")

# 排序查询
with engine.connect() as conn:
    stmt = select(products).order_by(products.c.price.desc())
    result = conn.execute(stmt)
    for row in result:
        print(f"ID: {row.id}, Name: {row.name}, Price: ${row.price}, Category: {row.category}")

# 聚合查询
with engine.connect() as conn:
    stmt = select(products.c.category, func.count(products.c.id).label('count'))
    stmt = stmt.group_by(products.c.category)
    result = conn.execute(stmt)
    for row in result:
        print(f"Category: {row.category}, Count: {row.count}")

4.4 更新数据

from sqlalchemy import update

with engine.connect() as conn:
    stmt = update(products).where(products.c.name == 'iPhone 14').values(price=899.99)
    result = conn.execute(stmt)
    conn.commit()
    print(f"更新了 {result.rowcount} 条数据")

4.5 删除数据

from sqlalchemy import delete

with engine.connect() as conn:
    stmt = delete(products).where(products.c.name == 'Coffee Mug')
    result = conn.execute(stmt)
    conn.commit()
    print(f"删除了 {result.rowcount} 条数据")

5. SQLAlchemy ORM API

SQLAlchemy ORM允许开发者将Python类映射到数据库表,使用面向对象的方式操作数据库。

5.1 定义模型

from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import declarative_base

# 创建引擎
engine = create_engine('sqlite:///sqlalchemy_orm_demo.db')

# 创建基类
Base = declarative_base()

# 定义模型类
class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

# 创建表
Base.metadata.create_all(engine)

5.2 创建会话

会话是ORM操作的核心,用于管理对象的生命周期。

from sqlalchemy.orm import sessionmaker

# 创建会话工厂
Session = sessionmaker(bind=engine)

# 创建会话
session = Session()

5.3 插入数据

# 创建用户对象
user1 = User(username='alice', email='alice@example.com', age=25)
user2 = User(username='bob', email='bob@example.com', age=30)

# 添加到会话
session.add(user1)
session.add(user2)

# 提交事务
session.commit()

# 添加多个用户
users = [
    User(username='charlie', email='charlie@example.com', age=35),
    User(username='david', email='david@example.com', age=40)
]
session.add_all(users)
session.commit()

5.4 查询数据

# 查询所有用户
users = session.query(User).all()
for user in users:
    print(user)

# 条件查询
users = session.query(User).filter(User.age > 30).all()
for user in users:
    print(user)

# 排序查询
users = session.query(User).order_by(User.age.desc()).all()
for user in users:
    print(user)

# 限制查询
users = session.query(User).limit(2).all()
for user in users:
    print(user)

# 单条查询
user = session.query(User).filter(User.username == 'alice').first()
print(user)

# 统计查询
count = session.query(User).count()
print(f"总用户数: {count}")

5.5 更新数据

# 查询用户
user = session.query(User).filter(User.username == 'alice').first()

# 更新数据
user.age = 26

# 提交事务
session.commit()
print(user)

# 批量更新
session.query(User).filter(User.age < 30).update({'age': User.age + 1})
session.commit()

5.6 删除数据

# 查询用户
user = session.query(User).filter(User.username == 'bob').first()

# 删除用户
session.delete(user)

# 提交事务
session.commit()

# 批量删除
session.query(User).filter(User.age > 35).delete()
session.commit()

6. 事务管理

SQLAlchemy提供了完整的事务支持,确保数据操作的原子性、一致性、隔离性和持久性。

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)

# 使用with语句自动管理事务
with Session() as session:
    try:
        # 创建用户
        user1 = User(username='transaction_user1', email='user1@example.com', age=25)
        user2 = User(username='transaction_user2', email='user2@example.com', age=30)
        
        # 添加到会话
        session.add(user1)
        session.add(user2)
        
        # 提交事务
        session.commit()
        print("事务提交成功")
    except Exception as e:
        # 回滚事务
        session.rollback()
        print(f"事务失败,已回滚: {e}")

7. 错误处理

SQLAlchemy提供了多种异常类型,用于处理不同类型的数据库错误。

from sqlalchemy.exc import IntegrityError, OperationalError, ProgrammingError

# 处理唯一约束错误
try:
    user = User(username='alice', email='alice@example.com', age=25)
    session.add(user)
    session.commit()
except IntegrityError as e:
    session.rollback()
    print(f"唯一约束错误: {e}")

# 处理连接错误
try:
    # 尝试连接不存在的数据库
    engine = create_engine('mysql+mysqlconnector://wrong_user:wrong_pass@localhost:3306/wrong_db')
    conn = engine.connect()
except OperationalError as e:
    print(f"连接错误: {e}")

# 处理SQL语法错误
try:
    with engine.connect() as conn:
        conn.execute("SELECT * FROM non_existent_table")
except ProgrammingError as e:
    print(f"SQL语法错误: {e}")

8. SQLAlchemy最佳实践

  1. 使用会话管理器:总是使用sessionmaker创建会话
  2. 自动提交和回滚:使用with语句自动管理事务
  3. 批量操作:使用add_all()进行批量插入
  4. 索引优化:为经常查询的列添加索引
  5. 延迟加载:根据需要使用延迟加载或预加载
  6. 错误处理:总是捕获并处理数据库异常
  7. 连接池配置:根据应用需求配置连接池

9. 总结

本集我们学习了SQLAlchemy的基础知识,包括:

  • SQLAlchemy的核心架构
  • Core API的使用(创建表、CRUD操作)
  • ORM API的使用(定义模型、会话管理、CRUD操作)
  • 事务管理和错误处理

SQLAlchemy是一个功能强大的ORM框架,它提供了灵活的数据库操作方式,适合各种规模的应用开发。在下一集中,我们将学习SQLAlchemy的高级用法,包括关系映射、高级查询等。

« 上一篇 ORM概念 下一篇 » SQLAlchemy高级用法