Python文件操作
学习目标
通过本集的学习,你将能够:
- 使用Python读取文件内容
- 使用Python写入文件
- 掌握文件路径处理
- 使用上下文管理器安全操作文件
- 处理不同编码的文件
1. 读取文件
1.1 基本文件读取
# 读取整个文件
with open("example.txt", "r", encoding="utf-8") as f:
content = f.read()
print(content)
# 按行读取
with open("example.txt", "r", encoding="utf-8") as f:
for line in f:
print(line.rstrip()) # rstrip() 去除换行符
# 读取所有行到列表
with open("example.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
print(lines)1.2 文件打开模式
# 常见模式
# r: 只读(默认)
# w: 只写(覆盖)
# a: 追加
# r+: 读写
# b: 二进制模式
# t: 文本模式(默认)
# 组合模式
# rb: 二进制只读
# wb: 二进制只写
# ab: 二进制追加1.3 逐块读取大文件
# 大文件逐块读取
with open("large_file.txt", "r", encoding="utf-8") as f:
while True:
chunk = f.read(4096) # 每次读取4KB
if not chunk:
break
print(chunk, end="")文件读取的ASCII图:
程序 ──→ open() ──→ 文件对象 ──→ read() ──→ 内容
│
├─ r: 只读
├─ w: 只写
└─ a: 追加2. 写入文件
2.1 写入文本文件
# 写入文件(覆盖)
with open("output.txt", "w", encoding="utf-8") as f:
f.write("Hello, World!\n")
f.write("这是第二行\n")
# 追加到文件
with open("output.txt", "a", encoding="utf-8") as f:
f.write("这是追加的内容\n")
# 写入多行
lines = ["第一行", "第二行", "第三行"]
with open("output.txt", "w", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
# 或使用 writelines
f.writelines([line + "\n" for line in lines])2.2 格式化写入
data = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"},
{"name": "王五", "age": 28, "city": "广州"}
]
with open("people.txt", "w", encoding="utf-8") as f:
# 写入表头
f.write("姓名\t年龄\t城市\n")
f.write("-" * 30 + "\n")
# 写入数据
for person in data:
f.write(f"{person['name']}\t{person['age']}\t{person['city']}\n")3. 文件路径处理
3.1 os.path 模块
import os
# 获取当前工作目录
print(os.getcwd())
# 路径拼接
path = os.path.join("folder", "subfolder", "file.txt")
print(path)
# 检查路径是否存在
print(os.path.exists("example.txt"))
# 检查是否是文件
print(os.path.isfile("example.txt"))
# 检查是否是目录
print(os.path.isdir("folder"))
# 获取文件名
print(os.path.basename("/path/to/file.txt")) # file.txt
# 获取目录名
print(os.path.dirname("/path/to/file.txt")) # /path/to
# 获取文件扩展名
print(os.path.splitext("file.txt")) # ('file', '.txt')
# 获取绝对路径
print(os.path.abspath("example.txt"))3.2 pathlib 模块(推荐)
from pathlib import Path
# 创建Path对象
path = Path("example.txt")
# 路径拼接
path = Path("folder") / "subfolder" / "file.txt"
print(path)
# 检查是否存在
print(path.exists())
# 检查是否是文件
print(path.is_file())
# 检查是否是目录
print(path.is_dir())
# 获取文件名
print(path.name)
# 获取父目录
print(path.parent)
# 获取文件扩展名
print(path.suffix)
# 获取文件名(不含扩展名)
print(path.stem)
# 获取绝对路径
print(path.resolve())
# 创建目录
Path("new_folder").mkdir(exist_ok=True)
# 遍历目录
for file in Path(".").iterdir():
print(file)
# 按模式匹配文件
for file in Path(".").glob("*.txt"):
print(file)4. 上下文管理器
4.1 with 语句
# 使用 with 语句(推荐)
with open("example.txt", "r", encoding="utf-8") as f:
content = f.read()
# 文件在此块内自动关闭
# 不使用 with 语句(不推荐)
f = open("example.txt", "r", encoding="utf-8")
try:
content = f.read()
finally:
f.close() # 必须手动关闭with语句的ASCII图:
with open(...) as f:
│
├─ __enter__() → 返回文件对象
│
├─ 执行代码块
│
└─ __exit__() → 关闭文件(即使出错)4.2 多个文件
# 同时打开多个文件
with open("input.txt", "r", encoding="utf-8") as infile, \
open("output.txt", "w", encoding="utf-8") as outfile:
for line in infile:
outfile.write(line.upper())5. 二进制文件操作
5.1 读写二进制文件
# 写入二进制文件
data = b"\x48\x65\x6c\x6c\x6f" # "Hello" 的字节
with open("binary.dat", "wb") as f:
f.write(data)
# 读取二进制文件
with open("binary.dat", "rb") as f:
content = f.read()
print(content)
print(content.decode("utf-8"))5.2 复制文件
def copy_file(source, destination):
"""复制文件"""
with open(source, "rb") as src, open(destination, "wb") as dst:
while True:
chunk = src.read(4096)
if not chunk:
break
dst.write(chunk)
copy_file("source.txt", "copy.txt")6. 处理不同编码
6.1 编码和解码
# 字符串编码为字节
text = "你好,世界"
encoded = text.encode("utf-8")
print(encoded)
# 字节解码为字符串
decoded = encoded.decode("utf-8")
print(decoded)
# 不同编码
text = "你好"
print(text.encode("utf-8"))
print(text.encode("gbk"))6.2 指定编码打开文件
# 使用 UTF-8 编码
with open("file.txt", "r", encoding="utf-8") as f:
content = f.read()
# 使用 GBK 编码
with open("file.txt", "r", encoding="gbk") as f:
content = f.read()
# 处理编码错误
with open("file.txt", "r", encoding="utf-8", errors="replace") as f:
content = f.read()7. 实用案例
7.1 案例1:日志文件分析器
# log_analyzer.py
from pathlib import Path
from collections import defaultdict
def analyze_log(log_file):
"""分析日志文件"""
log_path = Path(log_file)
if not log_path.exists():
print(f"文件不存在: {log_file}")
return
error_count = 0
warning_count = 0
info_count = 0
error_messages = []
with log_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
if "ERROR" in line:
error_count += 1
error_messages.append(line)
elif "WARNING" in line:
warning_count += 1
elif "INFO" in line:
info_count += 1
# 生成报告
report = f"""
日志分析报告
{'='*40}
文件: {log_file}
INFO: {info_count}
WARNING: {warning_count}
ERROR: {error_count}
"""
if error_messages:
report += "\n错误信息:\n"
for i, msg in enumerate(error_messages[:5], 1):
report += f"{i}. {msg}\n"
if len(error_messages) > 5:
report += f"... 还有 {len(error_messages) - 5} 条错误\n"
print(report)
# 保存报告
report_path = log_path.parent / f"{log_path.stem}_report.txt"
with report_path.open("w", encoding="utf-8") as f:
f.write(report)
print(f"报告已保存到: {report_path}")
# 创建示例日志文件
sample_log = """
2024-01-01 10:00:00 INFO 系统启动
2024-01-01 10:00:01 INFO 加载配置
2024-01-01 10:00:02 WARNING 配置文件缺失,使用默认值
2024-01-01 10:00:03 INFO 初始化完成
2024-01-01 10:00:04 ERROR 无法连接数据库
2024-01-01 10:00:05 ERROR 重试连接失败
2024-01-01 10:00:06 INFO 使用本地缓存
"""
with open("app.log", "w", encoding="utf-8") as f:
f.write(sample_log)
# 分析日志
analyze_log("app.log")7.2 案例2:CSV文件处理
# csv_handler.py
import csv
from pathlib import Path
def write_csv(data, filename):
"""写入CSV文件"""
path = Path(filename)
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
def read_csv(filename):
"""读取CSV文件"""
path = Path(filename)
with path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
return list(reader)
# 学生数据
students = [
{"name": "张三", "age": "20", "grade": "A"},
{"name": "李四", "age": "21", "grade": "B"},
{"name": "王五", "age": "19", "grade": "A"},
{"name": "赵六", "age": "22", "grade": "C"}
]
# 写入CSV
write_csv(students, "students.csv")
print("CSV文件已写入")
# 读取CSV
data = read_csv("students.csv")
print("\nCSV文件内容:")
for row in data:
print(row)
# 手动处理(不使用csv模块)
def manual_csv():
# 写入
with open("manual.csv", "w", encoding="utf-8") as f:
f.write("name,age,grade\n")
f.write("张三,20,A\n")
f.write("李四,21,B\n")
# 读取
with open("manual.csv", "r", encoding="utf-8") as f:
lines = f.readlines()
header = lines[0].strip().split(",")
for line in lines[1:]:
values = line.strip().split(",")
print(dict(zip(header, values)))
print("\n手动处理CSV:")
manual_csv()7.3 案例3:文件备份工具
# file_backup.py
from pathlib import Path
import shutil
from datetime import datetime
def backup_file(source, backup_dir="backups"):
"""备份文件"""
source_path = Path(source)
if not source_path.exists():
print(f"源文件不存在: {source}")
return False
if not source_path.is_file():
print(f"不是文件: {source}")
return False
# 创建备份目录
backup_path = Path(backup_dir)
backup_path.mkdir(exist_ok=True)
# 生成备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{source_path.stem}_{timestamp}{source_path.suffix}"
backup_file = backup_path / backup_filename
# 复制文件
shutil.copy2(source_path, backup_file)
print(f"备份成功: {source} -> {backup_file}")
return True
def restore_latest_backup(original_file, backup_dir="backups"):
"""恢复最新备份"""
original_path = Path(original_file)
backup_path = Path(backup_dir)
if not backup_path.exists():
print("备份目录不存在")
return False
# 查找相关备份文件
backups = list(backup_path.glob(f"{original_path.stem}_*{original_path.suffix}"))
if not backups:
print("没有找到备份文件")
return False
# 按时间排序,取最新的
latest_backup = max(backups, key=lambda p: p.stat().st_mtime)
# 恢复
shutil.copy2(latest_backup, original_path)
print(f"恢复成功: {latest_backup} -> {original_path}")
return True
def list_backups(original_file, backup_dir="backups"):
"""列出所有备份"""
original_path = Path(original_file)
backup_path = Path(backup_dir)
if not backup_path.exists():
print("备份目录不存在")
return
backups = list(backup_path.glob(f"{original_path.stem}_*{original_path.suffix}"))
if not backups:
print("没有备份文件")
return
print(f"{original_file} 的备份文件:")
for backup in sorted(backups, key=lambda p: p.stat().st_mtime, reverse=True):
size = backup.stat().st_size
mtime = datetime.fromtimestamp(backup.stat().st_mtime)
print(f" {backup.name} - {size} bytes - {mtime}")
# 创建测试文件
with open("important.txt", "w", encoding="utf-8") as f:
f.write("这是重要文件内容\n")
f.write("版本1\n")
print("=== 文件备份工具 ===")
# 备份
backup_file("important.txt")
# 修改文件
with open("important.txt", "a", encoding="utf-8") as f:
f.write("\n版本2的更新\n")
# 再次备份
import time
time.sleep(1) # 等待1秒,确保时间戳不同
backup_file("important.txt")
# 列出备份
print()
list_backups("important.txt")
# 恢复最新备份
print()
restore_latest_backup("important.txt")8. 自测问题
- 为什么推荐使用 with 语句操作文件?
- 文件打开模式 'r', 'w', 'a' 的区别是什么?
- pathlib 和 os.path 相比有什么优势?
- 如何处理文件编码问题?
- 二进制文件和文本文件操作有什么区别?
9. 下集预告
下一集我们将学习Python的异常处理!
参考资料
- Python官方文档: https://docs.python.org/3/tutorial/inputoutput.html
- 《Python编程:从入门到实践》