第20集:文件操作与异常处理 - Python数据持久化与错误处理
📚 本集学习目标
- 掌握Python文件的基本操作(打开、读取、写入、关闭)
- 学会处理不同类型的文件(文本文件、CSV、JSON等)
- 理解Python异常处理机制
- 掌握try-except语句的使用
- 学会自定义异常和错误处理策略
- 了解上下文管理器with语句的使用
🎯 1. 文件操作基础
1.1 文件打开与关闭
# 基本文件操作
f = open('example.txt', 'r', encoding='utf-8') # 打开文件
content = f.read() # 读取内容
f.close() # 关闭文件
# 更好的方式 - 使用with语句(推荐)
with open('example.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 文件会自动关闭,即使发生异常1.2 文件打开模式
# 文件模式说明
'''
r : 只读模式(默认)
w : 只写模式(会覆盖已有内容)
a : 追加模式(在文件末尾添加内容)
r+ : 读写模式
w+ : 读写模式(会覆盖已有内容)
a+ : 追加读写模式
b : 二进制模式(如 rb, wb, ab)
t : 文本模式(默认,如 rt, wt, at)
'''
# 示例
with open('data.txt', 'r', encoding='utf-8') as f: # 读取文本
content = f.read()
with open('data.txt', 'w', encoding='utf-8') as f: # 写入文本
f.write('Hello World')
with open('data.bin', 'rb') as f: # 读取二进制
data = f.read()
with open('data.bin', 'wb') as f: # 写入二进制
f.write(b'\x48\x65\x6c\x6c\x6f') # "Hello"的bytes📖 2. 文件读取操作
2.1 读取整个文件
# 方法1:read() - 读取全部内容
with open('example.txt', 'r', encoding='utf-8') as f:
content = f.read()
print(content)
# 方法2:read(size) - 读取指定字节数
with open('example.txt', 'r', encoding='utf-8') as f:
first_100 = f.read(100)
print(f"前100个字符: {first_100}")2.2 逐行读取
# 方法1:readlines() - 返回所有行的列表
with open('example.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
print(f"第{i}行: {line.rstrip()}")
# 方法2:直接遍历文件对象(推荐,内存效率高)
with open('example.txt', 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
print(f"第{line_num}行: {line.strip()}")
# 方法3:readline() - 逐行读取
with open('example.txt', 'r', encoding='utf-8') as f:
line1 = f.readline()
line2 = f.readline()
print(f"第一行: {line1.strip()}")
print(f"第二行: {line2.strip()}")2.3 文件指针操作
with open('example.txt', 'r', encoding='utf-8') as f:
# 获取当前指针位置
print(f"当前指针位置: {f.tell()}")
# 读取一些内容
content = f.read(10)
print(f"读取内容: {content}")
print(f"读取后指针位置: {f.tell()}")
# 移动指针到开头
f.seek(0)
print(f"移动到开头后指针位置: {f.tell()}")
# 再次读取
content2 = f.read(10)
print(f"再次读取: {content2}")✍️ 3. 文件写入操作
3.1 基本写入
# 写入字符串
with open('output.txt', 'w', encoding='utf-8') as f:
f.write('Hello, World!\n')
f.write('这是第二行\n')
f.write('Python文件操作\n')
# 写入列表
lines = ['第一行\n', '第二行\n', '第三行\n']
with open('output.txt', 'w', encoding='utf-8') as f:
f.writelines(lines)3.2 追加写入
# 追加模式
with open('output.txt', 'a', encoding='utf-8') as f:
f.write('这是追加的内容\n')
f.write(f'追加时间: {datetime.datetime.now()}\n')3.3 格式化写入
import datetime
data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': 30, 'city': '上海'},
{'name': '王五', 'age': 28, 'city': '广州'}
]
with open('people.txt', 'w', encoding='utf-8') as f:
# 写入表头
f.write('姓名\t年龄\t城市\n')
f.write('-' * 30 + '\n')
# 写入数据
for person in data:
f.write(f"{person['name']}\t{person['age']}\t{person['city']}\n")📊 4. 文件格式处理
4.1 CSV文件处理
import csv
# 写入CSV文件
data = [
['姓名', '年龄', '城市'],
['张三', '25', '北京'],
['李四', '30', '上海'],
['王五', '28', '广州']
]
with open('data.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(data)
# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)4.2 JSON文件处理
import json
# 写入JSON文件
data = {
'name': '张三',
'age': 25,
'hobbies': ['读书', '游泳', '编程'],
'address': {
'city': '北京',
'district': '朝阳区'
}
}
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取JSON文件
with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(f"姓名: {loaded_data['name']}")
print(f"爱好: {', '.join(loaded_data['hobbies'])}")4.3 二进制文件处理
# 保存二进制数据
data = b'Hello, Binary World!'
with open('binary.bin', 'wb') as f:
f.write(data)
# 读取二进制数据
with open('binary.bin', 'rb') as f:
loaded_data = f.read()
print(f"二进制数据: {loaded_data}")
print(f"转换为字符串: {loaded_data.decode('utf-8')}")⚠️ 5. 异常处理基础
5.1 基本异常处理
# 基本try-except结构
try:
result = 10 / 0
except ZeroDivisionError:
print("除数不能为零!")
# 捕获多个异常
try:
num = int(input("请输入一个数字: "))
result = 100 / num
print(f"结果: {result}")
except ValueError:
print("请输入有效的数字!")
except ZeroDivisionError:
print("除数不能为零!")
except Exception as e:
print(f"发生未知错误: {e}")5.2 完整的异常处理结构
try:
# 尝试执行的代码
with open('nonexistent.txt', 'r', encoding='utf-8') as f:
content = f.read()
print(content)
except FileNotFoundError:
print("文件不存在!")
except PermissionError:
print("没有权限访问文件!")
except Exception as e:
print(f"发生错误: {type(e).__name__}: {e}")
else:
# 没有异常时执行
print("文件读取成功!")
finally:
# 无论是否有异常都会执行
print("操作完成!")🛡️ 6. 文件操作异常处理
6.1 常见文件异常
def safe_file_read(filename):
"""安全的文件读取函数"""
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
return content
except FileNotFoundError:
print(f"错误:文件 '{filename}' 不存在")
return None
except PermissionError:
print(f"错误:没有权限访问文件 '{filename}'")
return None
except UnicodeDecodeError:
print(f"错误:文件 '{filename}' 编码格式不正确")
return None
except Exception as e:
print(f"错误:读取文件时发生未知错误 - {type(e).__name__}: {e}")
return None
# 使用示例
content = safe_file_read('example.txt')
if content:
print("文件内容读取成功!")6.2 文件操作安全模式
def safe_file_operations():
"""安全的文件操作示例"""
files_to_process = ['file1.txt', 'file2.txt', 'file3.txt']
results = {}
for filename in files_to_process:
try:
# 尝试读取文件
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
results[filename] = len(content)
except FileNotFoundError:
print(f"警告:文件 '{filename}' 不存在,跳过处理")
results[filename] = None
except Exception as e:
print(f"错误:处理文件 '{filename}' 时出错 - {e}")
results[filename] = None
return results
# 统计结果
results = safe_file_operations()
for filename, length in results.items():
if length is not None:
print(f"{filename}: {length} 个字符")
else:
print(f"{filename}: 处理失败")🔧 7. 上下文管理器
7.1 with语句的工作原理
# 自定义上下文管理器
class FileManager:
"""文件管理器上下文管理器"""
def __init__(self, filename, mode):
self.filename = filename
self.mode = mode
self.file = None
def __enter__(self):
"""进入上下文时调用"""
print(f"打开文件: {self.filename}")
self.file = open(self.filename, self.mode, encoding='utf-8')
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时调用"""
print(f"关闭文件: {self.filename}")
if self.file:
self.file.close()
# 如果有异常,返回False会重新抛出异常,返回True会抑制异常
if exc_type is not None:
print(f"发生异常: {exc_type.__name__}: {exc_val}")
return False # 重新抛出异常
return True # 抑制异常
# 使用自定义上下文管理器
try:
with FileManager('example.txt', 'r') as f:
content = f.read()
print(f"文件内容: {content[:50]}...")
# raise ValueError("测试异常") # 可以取消注释测试异常处理
except Exception as e:
print(f"捕获到异常: {e}")7.2 contextlib模块
from contextlib import contextmanager
@contextmanager
def file_manager(filename, mode):
"""使用contextmanager装饰器创建上下文管理器"""
print(f"打开文件: {filename}")
f = open(filename, mode, encoding='utf-8')
try:
yield f # 产生文件对象
finally:
print(f"关闭文件: {filename}")
f.close()
# 使用
with file_manager('example.txt', 'r') as f:
content = f.read()
print(f"读取了 {len(content)} 个字符")🎯 8. 实际应用案例
8.1 日志记录器
import datetime
import os
class Logger:
"""简单的日志记录器"""
def __init__(self, log_file='app.log'):
self.log_file = log_file
self._ensure_log_directory()
def _ensure_log_directory(self):
"""确保日志目录存在"""
log_dir = os.path.dirname(self.log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
def _write_log(self, level, message):
"""写入日志"""
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {level}: {message}\n"
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
except Exception as e:
print(f"写入日志失败: {e}")
def info(self, message):
"""记录信息日志"""
self._write_log("INFO", message)
def warning(self, message):
"""记录警告日志"""
self._write_log("WARNING", message)
def error(self, message):
"""记录错误日志"""
self._write_log("ERROR", message)
# 使用示例
logger = Logger("logs/app.log")
logger.info("程序启动")
logger.warning("这是一个警告信息")
logger.error("发生了错误")
print("日志记录完成,请查看 logs/app.log 文件")8.2 配置文件管理器
import json
import os
class ConfigManager:
"""配置文件管理器"""
def __init__(self, config_file='config.json'):
self.config_file = config_file
self.config = self._load_config()
def _load_config(self):
"""加载配置文件"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载配置文件失败: {e}")
return self._get_default_config()
else:
return self._get_default_config()
def _get_default_config(self):
"""获取默认配置"""
return {
"app_name": "Python应用",
"version": "1.0.0",
"debug": False,
"database": {
"host": "localhost",
"port": 3306,
"username": "root"
},
"logging": {
"level": "INFO",
"file": "app.log"
}
}
def save_config(self):
"""保存配置文件"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
print("配置文件保存成功")
return True
except Exception as e:
print(f"保存配置文件失败: {e}")
return False
def get(self, key, default=None):
"""获取配置值"""
keys = key.split('.')
value = self.config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def set(self, key, value):
"""设置配置值"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
# 使用示例
config = ConfigManager()
print(f"应用名称: {config.get('app_name')}")
print(f"数据库主机: {config.get('database.host')}")
config.set('app_name', '我的Python应用')
config.set('database.port', 5432)
config.save_config()🏆 9. 高级文件操作技巧
9.1 文件临时处理
import tempfile
import shutil
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', encoding='utf-8', delete=False) as temp_file:
temp_file.write("这是临时文件内容\n")
temp_file.write("处理完成后会被删除\n")
temp_filename = temp_file.name
print(f"临时文件路径: {temp_filename}")
# 使用临时文件
try:
with open(temp_filename, 'r', encoding='utf-8') as f:
content = f.read()
print(f"临时文件内容: {content}")
finally:
# 清理临时文件
os.unlink(temp_filename)
print("临时文件已删除")
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
print(f"临时目录: {temp_dir}")
temp_file_path = os.path.join(temp_dir, 'test.txt')
with open(temp_file_path, 'w') as f:
f.write("临时目录中的文件")
# 在临时目录中进行操作
print(f"临时目录内容: {os.listdir(temp_dir)}")
# 临时目录会自动删除9.2 文件备份与恢复
import shutil
import datetime
def backup_file(filename, backup_dir='backups'):
"""备份文件"""
if not os.path.exists(filename):
raise FileNotFoundError(f"源文件 '{filename}' 不存在")
# 创建备份目录
os.makedirs(backup_dir, exist_ok=True)
# 生成备份文件名
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
base_name = os.path.basename(filename)
backup_name = f"{timestamp}_{base_name}"
backup_path = os.path.join(backup_dir, backup_name)
# 复制文件
shutil.copy2(filename, backup_path)
print(f"文件已备份到: {backup_path}")
return backup_path
def restore_file(backup_path, target_path):
"""恢复文件"""
if not os.path.exists(backup_path):
raise FileNotFoundError(f"备份文件 '{backup_path}' 不存在")
shutil.copy2(backup_path, target_path)
print(f"文件已恢复到: {target_path}")
# 使用示例
try:
# 创建测试文件
with open('important_data.txt', 'w', encoding='utf-8') as f:
f.write("重要数据内容\n")
f.write("请妥善备份\n")
# 备份文件
backup_path = backup_file('important_data.txt')
# 修改原文件
with open('important_data.txt', 'a', encoding='utf-8') as f:
f.write("这是新增的内容\n")
# 恢复备份
restore_file(backup_path, 'important_data.txt_restored')
except Exception as e:
print(f"操作失败: {e}")🔍 10. 文件系统操作
10.1 目录操作
import os
from pathlib import Path
# 使用os模块
# 创建目录
os.makedirs('data/subdir', exist_ok=True)
# 检查路径是否存在
if os.path.exists('data'):
print("data目录存在")
# 列出目录内容
for item in os.listdir('data'):
item_path = os.path.join('data', item)
if os.path.isfile(item_path):
print(f"文件: {item}")
elif os.path.isdir(item_path):
print(f"目录: {item}")
# 使用pathlib模块(推荐)
path = Path('data')
# 创建目录
path.mkdir(exist_ok=True)
# 遍历目录
for item in path.rglob('*'): # 递归遍历
if item.is_file():
print(f"文件: {item.relative_to(path)}")
elif item.is_dir():
print(f"目录: {item.relative_to(path)}")10.2 文件信息获取
import os
import stat
def get_file_info(filename):
"""获取文件详细信息"""
try:
stat_info = os.stat(filename)
info = {
'文件名': filename,
'大小': f"{stat_info.st_size} 字节",
'创建时间': datetime.datetime.fromtimestamp(stat_info.st_ctime),
'修改时间': datetime.datetime.fromtimestamp(stat_info.st_mtime),
'访问时间': datetime.datetime.fromtimestamp(stat_info.st_atime),
'是否为文件': os.path.isfile(filename),
'是否为目录': os.path.isdir(filename),
'权限': oct(stat_info.st_mode)[-3:]
}
return info
except FileNotFoundError:
return None
# 使用示例
info = get_file_info('example.txt')
if info:
for key, value in info.items():
print(f"{key}: {value}")⚠️ 11. 常见错误和最佳实践
11.1 常见文件操作错误
# ❌ 错误示例
try:
f = open('file.txt', 'r')
content = f.read()
# 忘记关闭文件
# 如果这里发生异常,文件不会被关闭
result = 1 / 0
f.close() # 这行不会执行
except:
print("发生错误")
# ✅ 正确示例
try:
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
result = 1 / 0 # 文件会自动关闭
except ZeroDivisionError:
print("除零错误")
except FileNotFoundError:
print("文件不存在")
except Exception as e:
print(f"其他错误: {e}")11.2 最佳实践
# 1. 始终指定编码
with open('file.txt', 'r', encoding='utf-8') as f: # ✅
pass
with open('file.txt', 'r') as f: # ❌ 在不同系统可能有编码问题
pass
# 2. 使用with语句
with open('file.txt', 'w', encoding='utf-8') as f: # ✅
f.write('content')
# 3. 检查文件路径
filename = 'data/file.txt'
if os.path.exists(filename): # ✅
with open(filename, 'r') as f:
pass
# 4. 处理大文件要分块读取
def read_large_file(filename, chunk_size=1024):
"""分块读取大文件"""
with open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 5. 写入重要数据前先备份
def safe_write(filename, content):
"""安全写入文件"""
backup_name = f"{filename}.bak"
if os.path.exists(filename):
shutil.copy2(filename, backup_name)
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
except Exception:
if os.path.exists(backup_name):
shutil.move(backup_name, filename)
raise
finally:
if os.path.exists(backup_name):
os.remove(backup_name)📝 12. 本集总结
12.1 关键概念
✅ 文件操作:打开、读取、写入、关闭文件的基本方法
✅ 异常处理:try-except-else-finally结构的使用
✅ 上下文管理器:with语句和自动资源管理
✅ 文件格式:文本、CSV、JSON、二进制文件的处理
✅ 错误处理:常见文件异常的处理方法
12.2 学习成果
通过本集学习,你已经掌握了:
- 文件的基本读写操作
- 不同文件格式的处理方法
- Python异常处理机制
- 上下文管理器的使用
- 实际项目中的文件处理最佳实践
12.3 下集预告
下一集我们将学习面向对象编程基础,掌握Python的类和对象概念,开启面向对象编程的大门。
🎯 课后练习
基础练习
- 编写一个简单的文本编辑器,支持读取、编辑、保存文件
- 创建一个CSV文件处理器,可以读取和写入CSV数据
- 实现一个JSON配置文件管理器
进阶练习
- 编写一个日志记录器,支持不同级别的日志记录
- 创建一个文件备份工具,可以自动备份指定文件
- 实现一个简单的数据库,使用JSON文件存储数据
挑战练习
- 编写一个文件同步工具,可以同步两个目录的内容
- 创建一个文件压缩和解压缩工具
- 实现一个支持断点续传的文件下载器
记住:文件操作和数据持久化是编程的核心技能,异常处理是程序健壮性的保障!掌握这些技能让你的Python程序更加完善! 🐍✨