Python文件操作

学习目标

通过本集的学习,你将能够:

  • 使用Python读取文件内容
  • 使用Python写入文件
  • 掌握文件路径处理
  • 使用上下文管理器安全操作文件
  • 处理不同编码的文件

1. 读取文件

1.1 基本文件读取

# 读取整个文件
with open("example.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(content)

# 按行读取
with open("example.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.rstrip())  # rstrip() 去除换行符

# 读取所有行到列表
with open("example.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    print(lines)

1.2 文件打开模式

# 常见模式
# r: 只读(默认)
# w: 只写(覆盖)
# a: 追加
# r+: 读写
# b: 二进制模式
# t: 文本模式(默认)

# 组合模式
# rb: 二进制只读
# wb: 二进制只写
# ab: 二进制追加

1.3 逐块读取大文件

# 大文件逐块读取
with open("large_file.txt", "r", encoding="utf-8") as f:
    while True:
        chunk = f.read(4096)  # 每次读取4KB
        if not chunk:
            break
        print(chunk, end="")

文件读取的ASCII图:

程序 ──→ open() ──→ 文件对象 ──→ read() ──→ 内容
              │
              ├─ r: 只读
              ├─ w: 只写
              └─ a: 追加

2. 写入文件

2.1 写入文本文件

# 写入文件(覆盖)
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.write("这是第二行\n")

# 追加到文件
with open("output.txt", "a", encoding="utf-8") as f:
    f.write("这是追加的内容\n")

# 写入多行
lines = ["第一行", "第二行", "第三行"]
with open("output.txt", "w", encoding="utf-8") as f:
    for line in lines:
        f.write(line + "\n")
    
    # 或使用 writelines
    f.writelines([line + "\n" for line in lines])

2.2 格式化写入

data = [
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"},
    {"name": "王五", "age": 28, "city": "广州"}
]

with open("people.txt", "w", encoding="utf-8") as f:
    # 写入表头
    f.write("姓名\t年龄\t城市\n")
    f.write("-" * 30 + "\n")
    
    # 写入数据
    for person in data:
        f.write(f"{person['name']}\t{person['age']}\t{person['city']}\n")

3. 文件路径处理

3.1 os.path 模块

import os

# 获取当前工作目录
print(os.getcwd())

# 路径拼接
path = os.path.join("folder", "subfolder", "file.txt")
print(path)

# 检查路径是否存在
print(os.path.exists("example.txt"))

# 检查是否是文件
print(os.path.isfile("example.txt"))

# 检查是否是目录
print(os.path.isdir("folder"))

# 获取文件名
print(os.path.basename("/path/to/file.txt"))  # file.txt

# 获取目录名
print(os.path.dirname("/path/to/file.txt"))  # /path/to

# 获取文件扩展名
print(os.path.splitext("file.txt"))  # ('file', '.txt')

# 获取绝对路径
print(os.path.abspath("example.txt"))

3.2 pathlib 模块(推荐)

from pathlib import Path

# 创建Path对象
path = Path("example.txt")

# 路径拼接
path = Path("folder") / "subfolder" / "file.txt"
print(path)

# 检查是否存在
print(path.exists())

# 检查是否是文件
print(path.is_file())

# 检查是否是目录
print(path.is_dir())

# 获取文件名
print(path.name)

# 获取父目录
print(path.parent)

# 获取文件扩展名
print(path.suffix)

# 获取文件名(不含扩展名)
print(path.stem)

# 获取绝对路径
print(path.resolve())

# 创建目录
Path("new_folder").mkdir(exist_ok=True)

# 遍历目录
for file in Path(".").iterdir():
    print(file)

# 按模式匹配文件
for file in Path(".").glob("*.txt"):
    print(file)

4. 上下文管理器

4.1 with 语句

# 使用 with 语句(推荐)
with open("example.txt", "r", encoding="utf-8") as f:
    content = f.read()
    # 文件在此块内自动关闭

# 不使用 with 语句(不推荐)
f = open("example.txt", "r", encoding="utf-8")
try:
    content = f.read()
finally:
    f.close()  # 必须手动关闭

with语句的ASCII图:

with open(...) as f:
    │
    ├─ __enter__() → 返回文件对象
    │
    ├─ 执行代码块
    │
    └─ __exit__() → 关闭文件(即使出错)

4.2 多个文件

# 同时打开多个文件
with open("input.txt", "r", encoding="utf-8") as infile, \
     open("output.txt", "w", encoding="utf-8") as outfile:
    for line in infile:
        outfile.write(line.upper())

5. 二进制文件操作

5.1 读写二进制文件

# 写入二进制文件
data = b"\x48\x65\x6c\x6c\x6f"  # "Hello" 的字节
with open("binary.dat", "wb") as f:
    f.write(data)

# 读取二进制文件
with open("binary.dat", "rb") as f:
    content = f.read()
    print(content)
    print(content.decode("utf-8"))

5.2 复制文件

def copy_file(source, destination):
    """复制文件"""
    with open(source, "rb") as src, open(destination, "wb") as dst:
        while True:
            chunk = src.read(4096)
            if not chunk:
                break
            dst.write(chunk)

copy_file("source.txt", "copy.txt")

6. 处理不同编码

6.1 编码和解码

# 字符串编码为字节
text = "你好,世界"
encoded = text.encode("utf-8")
print(encoded)

# 字节解码为字符串
decoded = encoded.decode("utf-8")
print(decoded)

# 不同编码
text = "你好"
print(text.encode("utf-8"))
print(text.encode("gbk"))

6.2 指定编码打开文件

# 使用 UTF-8 编码
with open("file.txt", "r", encoding="utf-8") as f:
    content = f.read()

# 使用 GBK 编码
with open("file.txt", "r", encoding="gbk") as f:
    content = f.read()

# 处理编码错误
with open("file.txt", "r", encoding="utf-8", errors="replace") as f:
    content = f.read()

7. 实用案例

7.1 案例1:日志文件分析器

# log_analyzer.py

from pathlib import Path
from collections import defaultdict

def analyze_log(log_file):
    """分析日志文件"""
    log_path = Path(log_file)
    if not log_path.exists():
        print(f"文件不存在: {log_file}")
        return
    
    error_count = 0
    warning_count = 0
    info_count = 0
    error_messages = []
    
    with log_path.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            
            if "ERROR" in line:
                error_count += 1
                error_messages.append(line)
            elif "WARNING" in line:
                warning_count += 1
            elif "INFO" in line:
                info_count += 1
    
    # 生成报告
    report = f"""
日志分析报告
{'='*40}
文件: {log_file}
INFO: {info_count}
WARNING: {warning_count}
ERROR: {error_count}
"""
    
    if error_messages:
        report += "\n错误信息:\n"
        for i, msg in enumerate(error_messages[:5], 1):
            report += f"{i}. {msg}\n"
        if len(error_messages) > 5:
            report += f"... 还有 {len(error_messages) - 5} 条错误\n"
    
    print(report)
    
    # 保存报告
    report_path = log_path.parent / f"{log_path.stem}_report.txt"
    with report_path.open("w", encoding="utf-8") as f:
        f.write(report)
    print(f"报告已保存到: {report_path}")

# 创建示例日志文件
sample_log = """
2024-01-01 10:00:00 INFO 系统启动
2024-01-01 10:00:01 INFO 加载配置
2024-01-01 10:00:02 WARNING 配置文件缺失,使用默认值
2024-01-01 10:00:03 INFO 初始化完成
2024-01-01 10:00:04 ERROR 无法连接数据库
2024-01-01 10:00:05 ERROR 重试连接失败
2024-01-01 10:00:06 INFO 使用本地缓存
"""

with open("app.log", "w", encoding="utf-8") as f:
    f.write(sample_log)

# 分析日志
analyze_log("app.log")

7.2 案例2:CSV文件处理

# csv_handler.py

import csv
from pathlib import Path

def write_csv(data, filename):
    """写入CSV文件"""
    path = Path(filename)
    with path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)

def read_csv(filename):
    """读取CSV文件"""
    path = Path(filename)
    with path.open("r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        return list(reader)

# 学生数据
students = [
    {"name": "张三", "age": "20", "grade": "A"},
    {"name": "李四", "age": "21", "grade": "B"},
    {"name": "王五", "age": "19", "grade": "A"},
    {"name": "赵六", "age": "22", "grade": "C"}
]

# 写入CSV
write_csv(students, "students.csv")
print("CSV文件已写入")

# 读取CSV
data = read_csv("students.csv")
print("\nCSV文件内容:")
for row in data:
    print(row)

# 手动处理(不使用csv模块)
def manual_csv():
    # 写入
    with open("manual.csv", "w", encoding="utf-8") as f:
        f.write("name,age,grade\n")
        f.write("张三,20,A\n")
        f.write("李四,21,B\n")
    
    # 读取
    with open("manual.csv", "r", encoding="utf-8") as f:
        lines = f.readlines()
        header = lines[0].strip().split(",")
        for line in lines[1:]:
            values = line.strip().split(",")
            print(dict(zip(header, values)))

print("\n手动处理CSV:")
manual_csv()

7.3 案例3:文件备份工具

# file_backup.py

from pathlib import Path
import shutil
from datetime import datetime

def backup_file(source, backup_dir="backups"):
    """备份文件"""
    source_path = Path(source)
    
    if not source_path.exists():
        print(f"源文件不存在: {source}")
        return False
    
    if not source_path.is_file():
        print(f"不是文件: {source}")
        return False
    
    # 创建备份目录
    backup_path = Path(backup_dir)
    backup_path.mkdir(exist_ok=True)
    
    # 生成备份文件名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_filename = f"{source_path.stem}_{timestamp}{source_path.suffix}"
    backup_file = backup_path / backup_filename
    
    # 复制文件
    shutil.copy2(source_path, backup_file)
    print(f"备份成功: {source} -> {backup_file}")
    return True

def restore_latest_backup(original_file, backup_dir="backups"):
    """恢复最新备份"""
    original_path = Path(original_file)
    backup_path = Path(backup_dir)
    
    if not backup_path.exists():
        print("备份目录不存在")
        return False
    
    # 查找相关备份文件
    backups = list(backup_path.glob(f"{original_path.stem}_*{original_path.suffix}"))
    if not backups:
        print("没有找到备份文件")
        return False
    
    # 按时间排序,取最新的
    latest_backup = max(backups, key=lambda p: p.stat().st_mtime)
    
    # 恢复
    shutil.copy2(latest_backup, original_path)
    print(f"恢复成功: {latest_backup} -> {original_path}")
    return True

def list_backups(original_file, backup_dir="backups"):
    """列出所有备份"""
    original_path = Path(original_file)
    backup_path = Path(backup_dir)
    
    if not backup_path.exists():
        print("备份目录不存在")
        return
    
    backups = list(backup_path.glob(f"{original_path.stem}_*{original_path.suffix}"))
    if not backups:
        print("没有备份文件")
        return
    
    print(f"{original_file} 的备份文件:")
    for backup in sorted(backups, key=lambda p: p.stat().st_mtime, reverse=True):
        size = backup.stat().st_size
        mtime = datetime.fromtimestamp(backup.stat().st_mtime)
        print(f"  {backup.name} - {size} bytes - {mtime}")

# 创建测试文件
with open("important.txt", "w", encoding="utf-8") as f:
    f.write("这是重要文件内容\n")
    f.write("版本1\n")

print("=== 文件备份工具 ===")

# 备份
backup_file("important.txt")

# 修改文件
with open("important.txt", "a", encoding="utf-8") as f:
    f.write("\n版本2的更新\n")

# 再次备份
import time
time.sleep(1)  # 等待1秒,确保时间戳不同
backup_file("important.txt")

# 列出备份
print()
list_backups("important.txt")

# 恢复最新备份
print()
restore_latest_backup("important.txt")

8. 自测问题

  1. 为什么推荐使用 with 语句操作文件?
  2. 文件打开模式 'r', 'w', 'a' 的区别是什么?
  3. pathlib 和 os.path 相比有什么优势?
  4. 如何处理文件编码问题?
  5. 二进制文件和文本文件操作有什么区别?

9. 下集预告

下一集我们将学习Python的异常处理!

参考资料

« 上一篇 Python面向对象进阶 下一篇 » Python异常处理