第126集 PostgreSQL数据库连接

一、课程导入

在前面的课程中,我们已经学习了如何使用Python连接SQLite和MySQL数据库。SQLite是轻量级嵌入式数据库,MySQL是流行的开源关系型数据库。今天,我们将学习另一种功能强大的开源关系型数据库——PostgreSQL的连接和操作方法。

PostgreSQL是目前最先进的开源关系型数据库之一,它提供了丰富的功能和强大的扩展性,广泛应用于各种规模的企业应用和数据存储系统中。掌握Python与PostgreSQL的连接技术,将为我们开发更复杂、更高效的数据库应用提供更多选择。

二、PostgreSQL数据库简介

2.1 PostgreSQL的特点

PostgreSQL(简称Postgres)是一个开源的对象-关系型数据库管理系统(ORDBMS),具有以下主要特点:

  1. 高度兼容SQL标准:支持SQL:2016标准的大部分功能,包括高级特性如窗口函数、公共表表达式等
  2. 丰富的数据类型:支持基本数据类型、自定义数据类型、数组、JSON、JSONB、XML、几何类型等
  3. 强大的扩展性:可以自定义函数、操作符、数据类型、索引类型等
  4. 高可靠性:支持事务处理,保证数据的一致性和完整性
  5. 高安全性:提供细粒度的访问控制、SSL加密、行级安全等
  6. 高性能:优化的查询优化器、多种索引类型(B-tree、Hash、GiST、GIN等)
  7. 跨平台:支持Windows、Linux、macOS等多种操作系统
  8. 开源免费:遵循PostgreSQL许可证,可以免费使用和修改

2.2 PostgreSQL的应用场景

PostgreSQL广泛应用于以下场景:

  1. 企业级应用:如客户关系管理(CRM)、企业资源规划(ERP)系统
  2. 大数据处理:与Hadoop、Spark等大数据技术集成
  3. 地理信息系统(GIS):通过PostGIS扩展支持空间数据
  4. Web应用:为各种规模的网站提供数据存储服务
  5. 数据仓库:存储和分析大量业务数据
  6. 金融系统:需要高可靠性和事务支持的金融应用

三、Python连接PostgreSQL的准备工作

3.1 安装PostgreSQL数据库

在使用Python连接PostgreSQL之前,需要先安装PostgreSQL数据库。可以从PostgreSQL官网下载适合自己操作系统的安装包进行安装。

注意:本课程主要讲解Python与PostgreSQL的连接方法,不详细介绍PostgreSQL的安装过程。如果您还没有安装PostgreSQL,可以参考PostgreSQL官方文档或相关教程进行安装。

3.2 安装Python的PostgreSQL驱动

Python需要通过PostgreSQL驱动程序来连接和操作PostgreSQL数据库。最常用的驱动是psycopg2,它是PostgreSQL官方推荐的Python驱动。

安装命令

# 安装psycopg2-binary(推荐,无需编译)
pip install psycopg2-binary

# 或者安装psycopg2(需要编译环境)
pip install psycopg2

四、Python连接PostgreSQL的基本步骤

使用Python连接PostgreSQL数据库通常需要以下步骤:

  1. 导入PostgreSQL驱动模块
  2. 建立数据库连接
  3. 创建游标对象
  4. 执行SQL语句
  5. 处理查询结果
  6. 关闭游标
  7. 关闭数据库连接

4.1 导入PostgreSQL驱动模块

首先需要导入psycopg2模块:

import psycopg2

4.2 建立数据库连接

使用psycopg2.connect()函数建立与PostgreSQL数据库的连接,需要提供以下参数:

  • host:PostgreSQL服务器地址(如localhost或IP地址)
  • user:用户名
  • password:密码
  • database:要连接的数据库名称
  • port:PostgreSQL服务器端口号(默认5432)

示例

# 建立数据库连接
conn = psycopg2.connect(
    host='localhost',     # 服务器地址
    user='postgres',      # 用户名
    password='123456',    # 密码
    database='testdb',    # 数据库名称
    port=5432             # 端口号
)

4.3 创建游标对象

连接建立后,需要创建一个游标对象来执行SQL语句:

# 创建游标对象
cursor = conn.cursor()

4.4 执行SQL语句

使用游标对象的execute()方法执行SQL语句:

# 执行SQL语句
cursor.execute('SELECT * FROM students')

4.5 处理查询结果

对于查询语句(如SELECT),可以使用以下方法获取查询结果:

  • **fetchone()**:获取一条记录
  • **fetchmany(size)**:获取指定数量的记录
  • **fetchall()**:获取所有记录

示例

# 获取所有记录
results = cursor.fetchall()

# 遍历结果
for row in results:
    print(row)

4.6 关闭游标和连接

操作完成后,需要关闭游标和数据库连接,释放资源:

# 关闭游标
cursor.close()

# 关闭连接
conn.close()

五、完整的连接示例

下面是一个完整的Python连接PostgreSQL的示例代码:

import psycopg2

try:
    # 建立数据库连接
    conn = psycopg2.connect(
        host='localhost',
        user='postgres',
        password='123456',
        database='testdb'
    )
    print("✅ 成功连接到PostgreSQL数据库")
    
    # 创建游标对象
    cursor = conn.cursor()
    
    # 执行SQL语句
    cursor.execute('SELECT version()')
    
    # 获取查询结果
    version = cursor.fetchone()
    print(f"PostgreSQL版本: {version[0]}")
    
finally:
    # 关闭游标和连接
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()
        print("✅ 数据库连接已关闭")

六、SQL执行方法

6.1 执行单条SQL语句

使用execute()方法执行单条SQL语句:

# 创建表
try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 创建students表
    create_table_sql = '''
    CREATE TABLE IF NOT EXISTS students (
        id SERIAL PRIMARY KEY,
        name VARCHAR(50) NOT NULL,
        age INT,
        gender VARCHAR(10),
        email VARCHAR(100),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    '''
    
    cursor.execute(create_table_sql)
    conn.commit()
    print("✅ 创建表成功")
    
except psycopg2.Error as e:
    print(f"❌ 创建表失败: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()

6.2 执行带参数的SQL语句

使用参数化查询可以防止SQL注入攻击,提高安全性。在psycopg2中,使用%s作为占位符:

# 插入数据(参数化查询)
try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 插入数据
    name = "张三"
    age = 18
    gender = "男"
    email = "zhangsan@example.com"
    
    cursor.execute(
        "INSERT INTO students (name, age, gender, email) VALUES (%s, %s, %s, %s)",
        (name, age, gender, email)
    )
    
    # 提交事务
    conn.commit()
    print(f"✅ 插入数据成功,影响 {cursor.rowcount} 条记录")
    print(f"✅ 插入的记录ID: {cursor.lastrowid}")
    
except psycopg2.Error as e:
    print(f"❌ 插入数据失败: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()

6.3 批量执行SQL语句

使用executemany()方法可以批量执行SQL语句,提高效率:

# 批量插入数据
try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 批量插入的数据
    students = [
        ("李四", 19, "男", "lisi@example.com"),
        ("王五", 20, "女", "wangwu@example.com"),
        ("赵六", 21, "男", "zhaoliu@example.com")
    ]
    
    cursor.executemany(
        "INSERT INTO students (name, age, gender, email) VALUES (%s, %s, %s, %s)",
        students
    )
    
    # 提交事务
    conn.commit()
    print(f"✅ 批量插入数据成功,影响 {cursor.rowcount} 条记录")
    
except psycopg2.Error as e:
    print(f"❌ 批量插入数据失败: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()

七、事务处理

PostgreSQL支持事务处理,可以确保一组SQL操作的原子性、一致性、隔离性和持久性(ACID特性)。

7.1 事务的基本概念

  • 原子性(Atomicity):事务中的所有操作要么全部成功,要么全部失败回滚
  • 一致性(Consistency):事务执行前后,数据库的完整性约束没有被破坏
  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务
  • 持久性(Durability):事务一旦提交,对数据库的改变是永久性的

7.2 事务的使用方法

在Python中,可以使用以下方法进行事务处理:

try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 开始事务(默认情况下,PostgreSQL自动开始事务)
    
    # 创建accounts表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS accounts (
        id SERIAL PRIMARY KEY,
        name VARCHAR(50) NOT NULL,
        balance DECIMAL(10,2) DEFAULT 0.00
    )
    ''')
    
    # 插入测试数据
    cursor.execute("INSERT INTO accounts (name, balance) VALUES (%s, %s)", ("用户A", 1000.00))
    cursor.execute("INSERT INTO accounts (name, balance) VALUES (%s, %s)", ("用户B", 500.00))
    
    # 转账操作
    # 转出账户减少100
    cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE name = %s", ("用户A",))
    
    # 转入账户增加100
    cursor.execute("UPDATE accounts SET balance = balance + 100 WHERE name = %s", ("用户B",))
    
    # 提交事务
    conn.commit()
    print("✅ 事务提交成功")
    
    # 查询转账后余额
    cursor.execute("SELECT name, balance FROM accounts")
    balances = cursor.fetchall()
    print("转账后余额:")
    for account in balances:
        print(f"{account[0]}: {account[1]}")
        
except psycopg2.Error as e:
    print(f"❌ 事务执行失败,已回滚: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()
finally:
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()

八、错误处理

在连接和操作PostgreSQL数据库时,可能会遇到各种错误,如连接失败、SQL语法错误、权限不足等。我们需要使用try-except语句来捕获和处理这些错误。

8.1 常见的PostgreSQL错误类型

  • OperationalError:连接失败、服务器关闭等操作错误
  • ProgrammingError:SQL语法错误、表不存在等编程错误
  • IntegrityError:违反主键约束、外键约束等完整性错误
  • DataError:数据类型不匹配、数据过长等数据错误
  • InternalError:数据库内部错误
  • InterfaceError:驱动程序接口错误
  • Error:所有PostgreSQL错误的基类

8.2 错误处理示例

import psycopg2

try:
    # 尝试连接数据库
    conn = psycopg2.connect(
        host='localhost',
        user='wrong_user',      # 故意使用错误的用户名
        password='123456',
        database='testdb'
    )
    
except psycopg2.OperationalError as e:
    print(f"❌ 操作错误: {e}")
    print(f"错误代码: {e.pgcode}")
    print(f"错误信息: {e.pgerror}")

except psycopg2.ProgrammingError as e:
    print(f"❌ 编程错误: {e}")
    print(f"错误代码: {e.pgcode}")
    print(f"错误信息: {e.pgerror}")

except psycopg2.Error as e:
    print(f"❌ 其他PostgreSQL错误: {e}")
    print(f"错误代码: {e.pgcode}")
    print(f"错误信息: {e.pgerror}")

九、使用配置文件管理连接参数

在实际开发中,为了方便管理和维护,我们通常会将数据库连接参数存储在配置文件中,而不是直接硬编码在代码中。

9.1 使用ini配置文件

创建一个config.ini文件:

[postgresql]
host = localhost
user = postgres
password = 123456
database = testdb
port = 5432

然后使用Python的configparser模块读取配置文件:

import configparser
import psycopg2

# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')

# 获取PostgreSQL配置
pg_config = config['postgresql']

try:
    # 建立数据库连接
    conn = psycopg2.connect(
        host=pg_config['host'],
        user=pg_config['user'],
        password=pg_config['password'],
        database=pg_config['database'],
        port=int(pg_config['port'])
    )
    print("✅ 使用配置文件连接数据库成功")
    
    cursor = conn.cursor()
    cursor.execute('SELECT version()')
    version = cursor.fetchone()
    print(f"PostgreSQL版本: {version[0]}")
    
except Exception as e:
    print(f"❌ 连接失败: {e}")
finally:
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()

9.2 使用环境变量

也可以将数据库连接参数设置为环境变量:

import os
import psycopg2

# 从环境变量获取配置
host = os.environ.get('PG_HOST', 'localhost')
user = os.environ.get('PG_USER', 'postgres')
password = os.environ.get('PG_PASSWORD', '123456')
database = os.environ.get('PG_DATABASE', 'testdb')
port = int(os.environ.get('PG_PORT', '5432'))

try:
    # 建立数据库连接
    conn = psycopg2.connect(
        host=host,
        user=user,
        password=password,
        database=database,
        port=port
    )
    print("✅ 使用环境变量连接数据库成功")
    
except Exception as e:
    print(f"❌ 连接失败: {e}")
finally:
    if 'conn' in locals() and conn is not None:
        conn.close()

十、PostgreSQL的高级特性

10.1 序列(Sequence)

PostgreSQL使用序列来生成唯一的数值,通常用于主键自增。在前面的例子中,我们使用了SERIAL类型,它实际上是一个自动创建的序列:

-- SERIAL类型等价于
CREATE SEQUENCE students_id_seq;
CREATE TABLE students (
    id INTEGER DEFAULT nextval('students_id_seq') PRIMARY KEY,
    -- 其他字段
);

10.2 JSON数据类型

PostgreSQL支持JSON和JSONB数据类型,可以存储和查询JSON格式的数据:

try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 创建包含JSON字段的表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS products (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        details JSONB
    )
    ''')
    
    # 插入JSON数据
    product = {
        "price": 99.99,
        "category": "electronics",
        "tags": ["smartphone", "android"],
        "specs": {
            "screen": "6.5 inch",
            "storage": "128GB",
            "ram": "6GB"
        }
    }
    
    cursor.execute(
        "INSERT INTO products (name, details) VALUES (%s, %s)",
        ("智能手机", product)
    )
    
    conn.commit()
    print("✅ 插入JSON数据成功")
    
    # 查询JSON数据
    cursor.execute('SELECT name, details->>"price" AS price FROM products')
    products = cursor.fetchall()
    for product in products:
        print(f"产品: {product[0]}, 价格: {product[1]}")
        
except psycopg2.Error as e:
    print(f"❌ JSON操作失败: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()
finally:
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()

10.3 数组数据类型

PostgreSQL支持数组数据类型,可以存储同一类型的多个值:

try:
    conn = psycopg2.connect(
        host='localhost', user='postgres', password='123456', database='testdb'
    )
    cursor = conn.cursor()
    
    # 创建包含数组字段的表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(50) NOT NULL,
        hobbies TEXT[]
    )
    ''')
    
    # 插入数组数据
    hobbies = ["reading", "swimming", "coding"]
    cursor.execute(
        "INSERT INTO users (name, hobbies) VALUES (%s, %s)",
        ("张三", hobbies)
    )
    
    conn.commit()
    print("✅ 插入数组数据成功")
    
    # 查询数组数据
    cursor.execute('SELECT name, hobbies FROM users')
    users = cursor.fetchall()
    for user in users:
        print(f"用户: {user[0]}, 爱好: {user[1]}")
        print(f"第一个爱好: {user[1][0]}")
        
except psycopg2.Error as e:
    print(f"❌ 数组操作失败: {e}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()
finally:
    if 'cursor' in locals() and cursor is not None:
        cursor.close()
    if 'conn' in locals() and conn is not None:
        conn.close()

十一、最佳实践

  1. 使用参数化查询:防止SQL注入攻击
  2. 及时关闭连接:使用try-finally或上下文管理器确保连接关闭
  3. 使用连接池:对于频繁连接数据库的应用,使用连接池提高性能
  4. 处理事务:对需要保证数据一致性的操作使用事务
  5. 错误处理:捕获和处理可能发生的各种错误
  6. 配置管理:使用配置文件或环境变量管理连接参数
  7. 使用合适的数据类型:选择最适合的数据类型来存储数据
  8. 创建索引:为经常查询的列创建索引,提高查询效率
  9. 使用视图和存储过程:将复杂的查询逻辑封装到视图或存储过程中
  10. 定期备份:定期备份数据库,防止数据丢失

十二、常见问题解答

12.1 连接PostgreSQL失败怎么办?

  • 检查PostgreSQL服务是否已启动
  • 检查连接参数是否正确(host、port、user、password等)
  • 检查防火墙是否允许PostgreSQL的端口(默认5432)
  • 检查用户是否有远程连接权限

12.2 为什么会出现"password authentication failed"错误?

  • 用户名或密码错误
  • 用户没有访问指定数据库的权限
  • 用户没有远程连接权限

12.3 如何解决中文乱码问题?

  • 确保PostgreSQL数据库的字符集是UTF-8
  • 在连接参数中设置client_encoding为utf8
  • 在Python代码中使用UTF-8编码

12.4 如何处理大量数据的插入?

  • 使用批量插入(executemany()方法)
  • 使用COPY语句进行高效导入
  • 调整PostgreSQL的max_wal_size参数

12.5 如何提高查询性能?

  • 为经常查询的列创建索引
  • 优化SQL查询语句
  • 使用EXPLAIN分析查询计划
  • 调整PostgreSQL的内存参数

十三、课程总结

在本集课程中,我们学习了如何使用Python连接PostgreSQL数据库:

  1. PostgreSQL简介:了解了PostgreSQL的特点和应用场景
  2. 准备工作:安装PostgreSQL驱动psycopg2
  3. 连接步骤:导入模块、建立连接、创建游标、执行SQL、处理结果、关闭连接
  4. SQL执行:单条执行、参数化查询、批量执行
  5. 事务处理:确保数据的一致性和完整性
  6. 错误处理:捕获和处理各种数据库错误
  7. 配置管理:使用配置文件或环境变量管理连接参数
  8. 高级特性:JSON数据类型、数组数据类型等
  9. 最佳实践:提高代码质量和性能的建议

通过本课程的学习,我们掌握了Python与PostgreSQL的连接技术,为开发更复杂、更高效的数据库应用提供了更多选择。在后续的课程中,我们将学习ORM(对象关系映射)框架的使用,进一步提升我们的数据库开发能力。

十四、课后练习

  1. 安装PostgreSQL数据库和psycopg2驱动
  2. 创建一个名为company的数据库
  3. 在company数据库中创建employees表(包含id、name、department、position、salary字段)
  4. 使用Python连接到company数据库
  5. 向employees表中插入5条员工记录
  6. 查询所有员工信息并打印
  7. 查询工资大于8000的员工
  8. 更新一条员工记录
  9. 删除一条员工记录
  10. 使用事务处理模拟员工调薪操作

要求:

  • 使用参数化查询防止SQL注入
  • 添加适当的错误处理
  • 使用配置文件管理连接参数
  • 所有操作完成后关闭数据库连接
  • 尝试使用PostgreSQL的高级特性(如JSON或数组数据类型)
« 上一篇 MySQL数据库连接 下一篇 » ORM概念