第20集:文件操作与异常处理 - Python数据持久化与错误处理

📚 本集学习目标

  • 掌握Python文件的基本操作(打开、读取、写入、关闭)
  • 学会处理不同类型的文件(文本文件、CSV、JSON等)
  • 理解Python异常处理机制
  • 掌握try-except语句的使用
  • 学会自定义异常和错误处理策略
  • 了解上下文管理器with语句的使用

🎯 1. 文件操作基础

1.1 文件打开与关闭

# 基本文件操作
f = open('example.txt', 'r', encoding='utf-8')  # 打开文件
content = f.read()                                # 读取内容
f.close()                                        # 关闭文件

# 更好的方式 - 使用with语句(推荐)
with open('example.txt', 'r', encoding='utf-8') as f:
    content = f.read()
    # 文件会自动关闭,即使发生异常

1.2 文件打开模式

# 文件模式说明
'''
r   : 只读模式(默认)
w   : 只写模式(会覆盖已有内容)
a   : 追加模式(在文件末尾添加内容)
r+  : 读写模式
w+  : 读写模式(会覆盖已有内容)
a+  : 追加读写模式
b   : 二进制模式(如 rb, wb, ab)
t   : 文本模式(默认,如 rt, wt, at)
'''

# 示例
with open('data.txt', 'r', encoding='utf-8') as f:      # 读取文本
    content = f.read()

with open('data.txt', 'w', encoding='utf-8') as f:      # 写入文本
    f.write('Hello World')

with open('data.bin', 'rb') as f:                       # 读取二进制
    data = f.read()

with open('data.bin', 'wb') as f:                       # 写入二进制
    f.write(b'\x48\x65\x6c\x6c\x6f')  # "Hello"的bytes

📖 2. 文件读取操作

2.1 读取整个文件

# 方法1:read() - 读取全部内容
with open('example.txt', 'r', encoding='utf-8') as f:
    content = f.read()
    print(content)

# 方法2:read(size) - 读取指定字节数
with open('example.txt', 'r', encoding='utf-8') as f:
    first_100 = f.read(100)
    print(f"前100个字符: {first_100}")

2.2 逐行读取

# 方法1:readlines() - 返回所有行的列表
with open('example.txt', 'r', encoding='utf-8') as f:
    lines = f.readlines()
    for i, line in enumerate(lines, 1):
        print(f"第{i}行: {line.rstrip()}")

# 方法2:直接遍历文件对象(推荐,内存效率高)
with open('example.txt', 'r', encoding='utf-8') as f:
    for line_num, line in enumerate(f, 1):
        print(f"第{line_num}行: {line.strip()}")

# 方法3:readline() - 逐行读取
with open('example.txt', 'r', encoding='utf-8') as f:
    line1 = f.readline()
    line2 = f.readline()
    print(f"第一行: {line1.strip()}")
    print(f"第二行: {line2.strip()}")

2.3 文件指针操作

with open('example.txt', 'r', encoding='utf-8') as f:
    # 获取当前指针位置
    print(f"当前指针位置: {f.tell()}")
    
    # 读取一些内容
    content = f.read(10)
    print(f"读取内容: {content}")
    print(f"读取后指针位置: {f.tell()}")
    
    # 移动指针到开头
    f.seek(0)
    print(f"移动到开头后指针位置: {f.tell()}")
    
    # 再次读取
    content2 = f.read(10)
    print(f"再次读取: {content2}")

✍️ 3. 文件写入操作

3.1 基本写入

# 写入字符串
with open('output.txt', 'w', encoding='utf-8') as f:
    f.write('Hello, World!\n')
    f.write('这是第二行\n')
    f.write('Python文件操作\n')

# 写入列表
lines = ['第一行\n', '第二行\n', '第三行\n']
with open('output.txt', 'w', encoding='utf-8') as f:
    f.writelines(lines)

3.2 追加写入

# 追加模式
with open('output.txt', 'a', encoding='utf-8') as f:
    f.write('这是追加的内容\n')
    f.write(f'追加时间: {datetime.datetime.now()}\n')

3.3 格式化写入

import datetime

data = [
    {'name': '张三', 'age': 25, 'city': '北京'},
    {'name': '李四', 'age': 30, 'city': '上海'},
    {'name': '王五', 'age': 28, 'city': '广州'}
]

with open('people.txt', 'w', encoding='utf-8') as f:
    # 写入表头
    f.write('姓名\t年龄\t城市\n')
    f.write('-' * 30 + '\n')
    
    # 写入数据
    for person in data:
        f.write(f"{person['name']}\t{person['age']}\t{person['city']}\n")

📊 4. 文件格式处理

4.1 CSV文件处理

import csv

# 写入CSV文件
data = [
    ['姓名', '年龄', '城市'],
    ['张三', '25', '北京'],
    ['李四', '30', '上海'],
    ['王五', '28', '广州']
]

with open('data.csv', 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerows(data)

# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

4.2 JSON文件处理

import json

# 写入JSON文件
data = {
    'name': '张三',
    'age': 25,
    'hobbies': ['读书', '游泳', '编程'],
    'address': {
        'city': '北京',
        'district': '朝阳区'
    }
}

with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 读取JSON文件
with open('data.json', 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)
    print(f"姓名: {loaded_data['name']}")
    print(f"爱好: {', '.join(loaded_data['hobbies'])}")

4.3 二进制文件处理

# 保存二进制数据
data = b'Hello, Binary World!'
with open('binary.bin', 'wb') as f:
    f.write(data)

# 读取二进制数据
with open('binary.bin', 'rb') as f:
    loaded_data = f.read()
    print(f"二进制数据: {loaded_data}")
    print(f"转换为字符串: {loaded_data.decode('utf-8')}")

⚠️ 5. 异常处理基础

5.1 基本异常处理

# 基本try-except结构
try:
    result = 10 / 0
except ZeroDivisionError:
    print("除数不能为零!")

# 捕获多个异常
try:
    num = int(input("请输入一个数字: "))
    result = 100 / num
    print(f"结果: {result}")
except ValueError:
    print("请输入有效的数字!")
except ZeroDivisionError:
    print("除数不能为零!")
except Exception as e:
    print(f"发生未知错误: {e}")

5.2 完整的异常处理结构

try:
    # 尝试执行的代码
    with open('nonexistent.txt', 'r', encoding='utf-8') as f:
        content = f.read()
        print(content)
        
except FileNotFoundError:
    print("文件不存在!")
except PermissionError:
    print("没有权限访问文件!")
except Exception as e:
    print(f"发生错误: {type(e).__name__}: {e}")
else:
    # 没有异常时执行
    print("文件读取成功!")
finally:
    # 无论是否有异常都会执行
    print("操作完成!")

🛡️ 6. 文件操作异常处理

6.1 常见文件异常

def safe_file_read(filename):
    """安全的文件读取函数"""
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            content = f.read()
            return content
            
    except FileNotFoundError:
        print(f"错误:文件 '{filename}' 不存在")
        return None
    except PermissionError:
        print(f"错误:没有权限访问文件 '{filename}'")
        return None
    except UnicodeDecodeError:
        print(f"错误:文件 '{filename}' 编码格式不正确")
        return None
    except Exception as e:
        print(f"错误:读取文件时发生未知错误 - {type(e).__name__}: {e}")
        return None

# 使用示例
content = safe_file_read('example.txt')
if content:
    print("文件内容读取成功!")

6.2 文件操作安全模式

def safe_file_operations():
    """安全的文件操作示例"""
    files_to_process = ['file1.txt', 'file2.txt', 'file3.txt']
    results = {}
    
    for filename in files_to_process:
        try:
            # 尝试读取文件
            with open(filename, 'r', encoding='utf-8') as f:
                content = f.read()
                results[filename] = len(content)
                
        except FileNotFoundError:
            print(f"警告:文件 '{filename}' 不存在,跳过处理")
            results[filename] = None
        except Exception as e:
            print(f"错误:处理文件 '{filename}' 时出错 - {e}")
            results[filename] = None
    
    return results

# 统计结果
results = safe_file_operations()
for filename, length in results.items():
    if length is not None:
        print(f"{filename}: {length} 个字符")
    else:
        print(f"{filename}: 处理失败")

🔧 7. 上下文管理器

7.1 with语句的工作原理

# 自定义上下文管理器
class FileManager:
    """文件管理器上下文管理器"""
    
    def __init__(self, filename, mode):
        self.filename = filename
        self.mode = mode
        self.file = None
    
    def __enter__(self):
        """进入上下文时调用"""
        print(f"打开文件: {self.filename}")
        self.file = open(self.filename, self.mode, encoding='utf-8')
        return self.file
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时调用"""
        print(f"关闭文件: {self.filename}")
        if self.file:
            self.file.close()
        
        # 如果有异常,返回False会重新抛出异常,返回True会抑制异常
        if exc_type is not None:
            print(f"发生异常: {exc_type.__name__}: {exc_val}")
            return False  # 重新抛出异常
        return True  # 抑制异常

# 使用自定义上下文管理器
try:
    with FileManager('example.txt', 'r') as f:
        content = f.read()
        print(f"文件内容: {content[:50]}...")
        # raise ValueError("测试异常")  # 可以取消注释测试异常处理
except Exception as e:
    print(f"捕获到异常: {e}")

7.2 contextlib模块

from contextlib import contextmanager

@contextmanager
def file_manager(filename, mode):
    """使用contextmanager装饰器创建上下文管理器"""
    print(f"打开文件: {filename}")
    f = open(filename, mode, encoding='utf-8')
    try:
        yield f  # 产生文件对象
    finally:
        print(f"关闭文件: {filename}")
        f.close()

# 使用
with file_manager('example.txt', 'r') as f:
    content = f.read()
    print(f"读取了 {len(content)} 个字符")

🎯 8. 实际应用案例

8.1 日志记录器

import datetime
import os

class Logger:
    """简单的日志记录器"""
    
    def __init__(self, log_file='app.log'):
        self.log_file = log_file
        self._ensure_log_directory()
    
    def _ensure_log_directory(self):
        """确保日志目录存在"""
        log_dir = os.path.dirname(self.log_file)
        if log_dir and not os.path.exists(log_dir):
            os.makedirs(log_dir)
    
    def _write_log(self, level, message):
        """写入日志"""
        timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        log_entry = f"[{timestamp}] {level}: {message}\n"
        
        try:
            with open(self.log_file, 'a', encoding='utf-8') as f:
                f.write(log_entry)
        except Exception as e:
            print(f"写入日志失败: {e}")
    
    def info(self, message):
        """记录信息日志"""
        self._write_log("INFO", message)
    
    def warning(self, message):
        """记录警告日志"""
        self._write_log("WARNING", message)
    
    def error(self, message):
        """记录错误日志"""
        self._write_log("ERROR", message)

# 使用示例
logger = Logger("logs/app.log")
logger.info("程序启动")
logger.warning("这是一个警告信息")
logger.error("发生了错误")

print("日志记录完成,请查看 logs/app.log 文件")

8.2 配置文件管理器

import json
import os

class ConfigManager:
    """配置文件管理器"""
    
    def __init__(self, config_file='config.json'):
        self.config_file = config_file
        self.config = self._load_config()
    
    def _load_config(self):
        """加载配置文件"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except Exception as e:
                print(f"加载配置文件失败: {e}")
                return self._get_default_config()
        else:
            return self._get_default_config()
    
    def _get_default_config(self):
        """获取默认配置"""
        return {
            "app_name": "Python应用",
            "version": "1.0.0",
            "debug": False,
            "database": {
                "host": "localhost",
                "port": 3306,
                "username": "root"
            },
            "logging": {
                "level": "INFO",
                "file": "app.log"
            }
        }
    
    def save_config(self):
        """保存配置文件"""
        try:
            with open(self.config_file, 'w', encoding='utf-8') as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            print("配置文件保存成功")
            return True
        except Exception as e:
            print(f"保存配置文件失败: {e}")
            return False
    
    def get(self, key, default=None):
        """获取配置值"""
        keys = key.split('.')
        value = self.config
        try:
            for k in keys:
                value = value[k]
            return value
        except (KeyError, TypeError):
            return default
    
    def set(self, key, value):
        """设置配置值"""
        keys = key.split('.')
        config = self.config
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        config[keys[-1]] = value

# 使用示例
config = ConfigManager()
print(f"应用名称: {config.get('app_name')}")
print(f"数据库主机: {config.get('database.host')}")

config.set('app_name', '我的Python应用')
config.set('database.port', 5432)
config.save_config()

🏆 9. 高级文件操作技巧

9.1 文件临时处理

import tempfile
import shutil

# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as temp_file:
    temp_file.write("这是临时文件内容\n")
    temp_file.write("处理完成后会被删除\n")
    temp_filename = temp_file.name

print(f"临时文件路径: {temp_filename}")

# 使用临时文件
try:
    with open(temp_filename, 'r', encoding='utf-8') as f:
        content = f.read()
        print(f"临时文件内容: {content}")
finally:
    # 清理临时文件
    os.unlink(temp_filename)
    print("临时文件已删除")

# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
    print(f"临时目录: {temp_dir}")
    temp_file_path = os.path.join(temp_dir, 'test.txt')
    with open(temp_file_path, 'w') as f:
        f.write("临时目录中的文件")
    
    # 在临时目录中进行操作
    print(f"临时目录内容: {os.listdir(temp_dir)}")

# 临时目录会自动删除

9.2 文件备份与恢复

import shutil
import datetime

def backup_file(filename, backup_dir='backups'):
    """备份文件"""
    if not os.path.exists(filename):
        raise FileNotFoundError(f"源文件 '{filename}' 不存在")
    
    # 创建备份目录
    os.makedirs(backup_dir, exist_ok=True)
    
    # 生成备份文件名
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    base_name = os.path.basename(filename)
    backup_name = f"{timestamp}_{base_name}"
    backup_path = os.path.join(backup_dir, backup_name)
    
    # 复制文件
    shutil.copy2(filename, backup_path)
    print(f"文件已备份到: {backup_path}")
    return backup_path

def restore_file(backup_path, target_path):
    """恢复文件"""
    if not os.path.exists(backup_path):
        raise FileNotFoundError(f"备份文件 '{backup_path}' 不存在")
    
    shutil.copy2(backup_path, target_path)
    print(f"文件已恢复到: {target_path}")

# 使用示例
try:
    # 创建测试文件
    with open('important_data.txt', 'w', encoding='utf-8') as f:
        f.write("重要数据内容\n")
        f.write("请妥善备份\n")
    
    # 备份文件
    backup_path = backup_file('important_data.txt')
    
    # 修改原文件
    with open('important_data.txt', 'a', encoding='utf-8') as f:
        f.write("这是新增的内容\n")
    
    # 恢复备份
    restore_file(backup_path, 'important_data.txt_restored')
    
except Exception as e:
    print(f"操作失败: {e}")

🔍 10. 文件系统操作

10.1 目录操作

import os
from pathlib import Path

# 使用os模块
# 创建目录
os.makedirs('data/subdir', exist_ok=True)

# 检查路径是否存在
if os.path.exists('data'):
    print("data目录存在")

# 列出目录内容
for item in os.listdir('data'):
    item_path = os.path.join('data', item)
    if os.path.isfile(item_path):
        print(f"文件: {item}")
    elif os.path.isdir(item_path):
        print(f"目录: {item}")

# 使用pathlib模块(推荐)
path = Path('data')

# 创建目录
path.mkdir(exist_ok=True)

# 遍历目录
for item in path.rglob('*'):  # 递归遍历
    if item.is_file():
        print(f"文件: {item.relative_to(path)}")
    elif item.is_dir():
        print(f"目录: {item.relative_to(path)}")

10.2 文件信息获取

import os
import stat

def get_file_info(filename):
    """获取文件详细信息"""
    try:
        stat_info = os.stat(filename)
        
        info = {
            '文件名': filename,
            '大小': f"{stat_info.st_size} 字节",
            '创建时间': datetime.datetime.fromtimestamp(stat_info.st_ctime),
            '修改时间': datetime.datetime.fromtimestamp(stat_info.st_mtime),
            '访问时间': datetime.datetime.fromtimestamp(stat_info.st_atime),
            '是否为文件': os.path.isfile(filename),
            '是否为目录': os.path.isdir(filename),
            '权限': oct(stat_info.st_mode)[-3:]
        }
        
        return info
    except FileNotFoundError:
        return None

# 使用示例
info = get_file_info('example.txt')
if info:
    for key, value in info.items():
        print(f"{key}: {value}")

⚠️ 11. 常见错误和最佳实践

11.1 常见文件操作错误

# ❌ 错误示例
try:
    f = open('file.txt', 'r')
    content = f.read()
    # 忘记关闭文件
    # 如果这里发生异常,文件不会被关闭
    result = 1 / 0
    f.close()  # 这行不会执行
except:
    print("发生错误")

# ✅ 正确示例
try:
    with open('file.txt', 'r', encoding='utf-8') as f:
        content = f.read()
        result = 1 / 0  # 文件会自动关闭
except ZeroDivisionError:
    print("除零错误")
except FileNotFoundError:
    print("文件不存在")
except Exception as e:
    print(f"其他错误: {e}")

11.2 最佳实践

# 1. 始终指定编码
with open('file.txt', 'r', encoding='utf-8') as f:  # ✅
    pass

with open('file.txt', 'r') as f:  # ❌ 在不同系统可能有编码问题
    pass

# 2. 使用with语句
with open('file.txt', 'w', encoding='utf-8') as f:  # ✅
    f.write('content')

# 3. 检查文件路径
filename = 'data/file.txt'
if os.path.exists(filename):  # ✅
    with open(filename, 'r') as f:
        pass

# 4. 处理大文件要分块读取
def read_large_file(filename, chunk_size=1024):
    """分块读取大文件"""
    with open(filename, 'r', encoding='utf-8') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# 5. 写入重要数据前先备份
def safe_write(filename, content):
    """安全写入文件"""
    backup_name = f"{filename}.bak"
    if os.path.exists(filename):
        shutil.copy2(filename, backup_name)
    
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
    except Exception:
        if os.path.exists(backup_name):
            shutil.move(backup_name, filename)
        raise
    finally:
        if os.path.exists(backup_name):
            os.remove(backup_name)

📝 12. 本集总结

12.1 关键概念

文件操作:打开、读取、写入、关闭文件的基本方法
异常处理:try-except-else-finally结构的使用
上下文管理器:with语句和自动资源管理
文件格式:文本、CSV、JSON、二进制文件的处理
错误处理:常见文件异常的处理方法

12.2 学习成果

通过本集学习,你已经掌握了:

  • 文件的基本读写操作
  • 不同文件格式的处理方法
  • Python异常处理机制
  • 上下文管理器的使用
  • 实际项目中的文件处理最佳实践

12.3 下集预告

下一集我们将学习面向对象编程基础,掌握Python的类和对象概念,开启面向对象编程的大门。


🎯 课后练习

基础练习

  1. 编写一个简单的文本编辑器,支持读取、编辑、保存文件
  2. 创建一个CSV文件处理器,可以读取和写入CSV数据
  3. 实现一个JSON配置文件管理器

进阶练习

  1. 编写一个日志记录器,支持不同级别的日志记录
  2. 创建一个文件备份工具,可以自动备份指定文件
  3. 实现一个简单的数据库,使用JSON文件存储数据

挑战练习

  1. 编写一个文件同步工具,可以同步两个目录的内容
  2. 创建一个文件压缩和解压缩工具
  3. 实现一个支持断点续传的文件下载器

记住:文件操作和数据持久化是编程的核心技能,异常处理是程序健壮性的保障!掌握这些技能让你的Python程序更加完善! 🐍✨

« 上一篇 综合练习猜数字游戏 下一篇 » 字符串基础操作