第162集_文件批量处理
一、文件批量处理概述
文件批量处理是Python自动化中最常见的应用场景之一,它可以帮助我们快速、高效地处理大量文件,避免重复的手动操作。在本集中,我们将学习如何使用Python进行各种文件批量处理操作。
1. 文件批量处理的常见需求
- 批量重命名文件
- 批量移动或复制文件
- 批量转换文件格式
- 批量处理文件内容
- 批量压缩或解压缩文件
- 批量创建或删除文件/目录
2. 常用Python模块
- os:提供操作系统相关功能,如文件路径操作、文件重命名等
- shutil:提供高级文件操作,如复制、移动、删除等
- glob:用于匹配文件路径
- pathlib:提供面向对象的路径操作
- fnmatch:用于文件名匹配
二、文件路径操作
在进行文件批量处理之前,我们需要先了解如何操作文件路径。
1. 使用os模块
import os
# 获取当前工作目录
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")
# 拼接路径
file_path = os.path.join(current_dir, 'data', 'file.txt')
print(f"拼接后的路径: {file_path}")
# 获取目录名和文件名
dir_name = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
print(f"目录名: {dir_name}")
print(f"文件名: {file_name}")
# 获取文件名和扩展名
name, ext = os.path.splitext(file_name)
print(f"文件名(不含扩展名): {name}")
print(f"扩展名: {ext}")
# 检查路径是否存在
if os.path.exists(file_path):
print(f"路径存在: {file_path}")
else:
print(f"路径不存在: {file_path}")
# 检查是否是文件或目录
print(f"是否是文件: {os.path.isfile(file_path)}")
print(f"是否是目录: {os.path.isdir(dir_name)}")2. 使用pathlib模块(推荐)
from pathlib import Path
# 创建Path对象
file_path = Path('data', 'file.txt')
# 获取绝对路径
abs_path = file_path.absolute()
print(f"绝对路径: {abs_path}")
# 获取目录和文件名
dir_path = file_path.parent
file_name = file_path.name
print(f"目录路径: {dir_path}")
print(f"文件名: {file_name}")
# 获取文件名和扩展名
stem = file_path.stem
suffix = file_path.suffix
print(f"文件名(不含扩展名): {stem}")
print(f"扩展名: {suffix}")
# 检查路径是否存在
if file_path.exists():
print(f"路径存在: {file_path}")
# 检查是否是文件或目录
print(f"是否是文件: {file_path.is_file()}")
print(f"是否是目录: {file_path.is_dir()}")三、文件批量重命名
1. 按序号批量重命名
import os
from pathlib import Path
def batch_rename_files(directory, prefix, extension):
"""
按序号批量重命名文件
参数:
directory: 目标目录
prefix: 新文件名前缀
extension: 文件扩展名(包含点号,如'.txt')
"""
# 转换为Path对象
dir_path = Path(directory)
# 检查目录是否存在
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取指定扩展名的文件
files = list(dir_path.glob(f"*{extension}"))
files.sort() # 按文件名排序
print(f"找到 {len(files)} 个 {extension} 文件")
# 批量重命名
for i, file_path in enumerate(files, 1):
# 构建新文件名
new_name = f"{prefix}_{i:03d}{extension}"
new_path = dir_path / new_name
# 重命名文件
file_path.rename(new_path)
print(f"重命名: {file_path.name} -> {new_name}")
print("批量重命名完成!")
# 使用示例
if __name__ == "__main__":
batch_rename_files("C:\\Users\\User\\Photos", "vacation", ".jpg")2. 按规则替换文件名
from pathlib import Path
def batch_rename_replace(directory, old_str, new_str, extension="*"):
"""
按规则替换文件名中的字符串
参数:
directory: 目标目录
old_str: 要替换的字符串
new_str: 替换后的字符串
extension: 文件扩展名(默认为所有文件)
"""
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
files = list(dir_path.glob(pattern))
print(f"找到 {len(files)} 个匹配文件")
# 批量重命名
renamed_count = 0
for file_path in files:
if old_str in file_path.name:
# 替换字符串
new_name = file_path.name.replace(old_str, new_str)
new_path = dir_path / new_name
# 重命名文件
file_path.rename(new_path)
print(f"重命名: {file_path.name} -> {new_name}")
renamed_count += 1
print(f"批量重命名完成! 共重命名了 {renamed_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_rename_replace("C:\\Users\\User\\Documents", "old", "new", ".txt")四、文件批量移动和复制
1. 批量移动文件
import os
import shutil
from pathlib import Path
def batch_move_files(source_dir, dest_dir, extension="*"):
"""
批量移动文件到指定目录
参数:
source_dir: 源目录
dest_dir: 目标目录
extension: 文件扩展名(默认为所有文件)
"""
source_path = Path(source_dir)
dest_path = Path(dest_dir)
# 检查源目录
if not source_path.exists() or not source_path.is_dir():
print(f"源目录不存在或不是目录: {source_dir}")
return
# 创建目标目录(如果不存在)
dest_path.mkdir(parents=True, exist_ok=True)
# 获取文件
pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
files = list(source_path.glob(pattern))
print(f"找到 {len(files)} 个匹配文件")
# 批量移动
moved_count = 0
for file_path in files:
# 目标文件路径
dest_file_path = dest_path / file_path.name
# 检查目标文件是否已存在
if dest_file_path.exists():
print(f"目标文件已存在,跳过: {dest_file_path.name}")
continue
# 移动文件
shutil.move(str(file_path), str(dest_file_path))
print(f"移动: {file_path.name} -> {dest_dir}")
moved_count += 1
print(f"批量移动完成! 共移动了 {moved_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_move_files("C:\\Users\\User\\Downloads", "C:\\Users\\User\\Documents\\Reports", ".pdf")2. 批量复制文件
from pathlib import Path
import shutil
def batch_copy_files(source_dir, dest_dir, extension="*"):
"""
批量复制文件到指定目录
参数:
source_dir: 源目录
dest_dir: 目标目录
extension: 文件扩展名(默认为所有文件)
"""
source_path = Path(source_dir)
dest_path = Path(dest_dir)
# 检查源目录
if not source_path.exists() or not source_path.is_dir():
print(f"源目录不存在或不是目录: {source_dir}")
return
# 创建目标目录(如果不存在)
dest_path.mkdir(parents=True, exist_ok=True)
# 获取文件
pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
files = list(source_path.glob(pattern))
print(f"找到 {len(files)} 个匹配文件")
# 批量复制
copied_count = 0
for file_path in files:
# 目标文件路径
dest_file_path = dest_path / file_path.name
# 复制文件(保留元数据)
shutil.copy2(str(file_path), str(dest_file_path))
print(f"复制: {file_path.name} -> {dest_dir}")
copied_count += 1
print(f"批量复制完成! 共复制了 {copied_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_copy_files("C:\\Users\\User\\Photos", "D:\\Backup\\Photos", ".jpg")五、文件批量内容处理
1. 批量替换文件内容
from pathlib import Path
def batch_replace_content(directory, old_str, new_str, extension=".txt"):
"""
批量替换文件内容
参数:
directory: 目标目录
old_str: 要替换的字符串
new_str: 替换后的字符串
extension: 文件扩展名(默认为.txt)
"""
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
files = list(dir_path.glob(f"*{extension}"))
print(f"找到 {len(files)} 个 {extension} 文件")
# 批量替换内容
processed_count = 0
for file_path in files:
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 检查是否需要替换
if old_str not in content:
continue
# 替换内容
new_content = content.replace(old_str, new_str)
# 写回文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
print(f"处理完成: {file_path.name}")
processed_count += 1
except Exception as e:
print(f"处理失败 {file_path.name}: {e}")
print(f"批量替换完成! 共处理了 {processed_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_replace_content("C:\\Users\\User\\Documents", "old_company", "new_company", ".txt")2. 批量提取文件内容
import re
from pathlib import Path
def batch_extract_content(directory, pattern, output_file, extension=".txt"):
"""
批量提取文件内容并保存到一个文件
参数:
directory: 目标目录
pattern: 正则表达式模式
output_file: 输出文件路径
extension: 文件扩展名(默认为.txt)
"""
dir_path = Path(directory)
output_path = Path(output_file)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
files = list(dir_path.glob(f"*{extension}"))
print(f"找到 {len(files)} 个 {extension} 文件")
# 编译正则表达式
regex = re.compile(pattern)
# 创建输出文件
with open(output_path, 'w', encoding='utf-8') as out_f:
for file_path in files:
try:
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 提取匹配内容
matches = regex.findall(content)
if matches:
out_f.write(f"=== {file_path.name} ===\n")
for match in matches:
out_f.write(f"{match}\n")
out_f.write("\n")
print(f"提取完成: {file_path.name}")
except Exception as e:
print(f"提取失败 {file_path.name}: {e}")
print(f"批量提取完成! 结果保存到: {output_file}")
# 使用示例
if __name__ == "__main__":
# 提取所有邮箱地址
pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
batch_extract_content("C:\\Users\\User\\Documents", pattern, "extracted_emails.txt")六、文件批量压缩和解压缩
1. 批量压缩文件
import zipfile
from pathlib import Path
def batch_compress_files(directory, zip_name, extension="*"):
"""
批量压缩文件到ZIP文件
参数:
directory: 目标目录
zip_name: ZIP文件名(包含路径)
extension: 文件扩展名(默认为所有文件)
"""
dir_path = Path(directory)
zip_path = Path(zip_name)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
files = list(dir_path.glob(pattern))
print(f"找到 {len(files)} 个匹配文件")
# 创建ZIP文件
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
# 将文件添加到ZIP中,保留相对路径
arcname = file_path.relative_to(dir_path.parent)
zipf.write(file_path, arcname)
print(f"添加到ZIP: {file_path.name}")
print(f"批量压缩完成! ZIP文件: {zip_path}")
print(f"ZIP文件大小: {zip_path.stat().st_size / 1024:.2f} KB")
# 使用示例
if __name__ == "__main__":
batch_compress_files("C:\\Users\\User\\Photos", "D:\\Backup\\photos_backup.zip", ".jpg")2. 批量解压缩文件
import zipfile
import os
from pathlib import Path
def batch_extract_zip(zip_files, dest_dir):
"""
批量解压缩ZIP文件
参数:
zip_files: ZIP文件列表或包含ZIP文件的目录
dest_dir: 目标目录
"""
dest_path = Path(dest_dir)
# 创建目标目录
dest_path.mkdir(parents=True, exist_ok=True)
# 收集所有ZIP文件
zip_list = []
if isinstance(zip_files, list):
# 如果是列表,直接使用
zip_list = zip_files
else:
# 如果是目录,获取目录中的所有ZIP文件
zip_dir = Path(zip_files)
if zip_dir.exists() and zip_dir.is_dir():
zip_list = list(zip_dir.glob("*.zip"))
print(f"找到 {len(zip_list)} 个ZIP文件")
# 批量解压缩
for zip_path in zip_list:
if not zip_path.exists() or not zip_path.is_file():
print(f"跳过: {zip_path} 不是有效文件")
continue
try:
with zipfile.ZipFile(zip_path, 'r') as zipf:
# 获取ZIP文件中的所有文件
zip_contents = zipf.namelist()
print(f"解压缩: {zip_path.name} (包含 {len(zip_contents)} 个文件)")
# 解压缩到目标目录
zipf.extractall(dest_path)
except zipfile.BadZipFile:
print(f"跳过: {zip_path.name} 不是有效的ZIP文件")
except Exception as e:
print(f"解压缩失败 {zip_path.name}: {e}")
print(f"批量解压缩完成! 文件保存到: {dest_path}")
# 使用示例
if __name__ == "__main__":
# 解压缩单个ZIP文件
# batch_extract_zip(["D:\\Backup\\photos_backup.zip"], "D:\\Extracted")
# 解压缩目录中的所有ZIP文件
batch_extract_zip("D:\\Backup", "D:\\Extracted")七、文件批量转换格式
1. 批量转换文本编码
from pathlib import Path
def batch_convert_encoding(directory, from_encoding, to_encoding, extension=".txt"):
"""
批量转换文件编码
参数:
directory: 目标目录
from_encoding: 源编码
to_encoding: 目标编码
extension: 文件扩展名(默认为.txt)
"""
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
files = list(dir_path.glob(f"*{extension}"))
print(f"找到 {len(files)} 个 {extension} 文件")
# 批量转换编码
converted_count = 0
for file_path in files:
try:
# 读取源文件
with open(file_path, 'r', encoding=from_encoding) as f:
content = f.read()
# 写回新编码
with open(file_path, 'w', encoding=to_encoding) as f:
f.write(content)
print(f"转换完成: {file_path.name}")
converted_count += 1
except UnicodeDecodeError:
print(f"解码失败 {file_path.name}: 可能不是 {from_encoding} 编码")
except Exception as e:
print(f"转换失败 {file_path.name}: {e}")
print(f"批量转换完成! 共转换了 {converted_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_convert_encoding("C:\\Users\\User\\Documents", "gbk", "utf-8", ".txt")2. 批量转换CSV到Excel
import pandas as pd
from pathlib import Path
def batch_convert_csv_to_excel(directory, output_dir):
"""
批量将CSV文件转换为Excel文件
参数:
directory: 包含CSV文件的目录
output_dir: 输出Excel文件的目录
"""
dir_path = Path(directory)
out_path = Path(output_dir)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 创建输出目录
out_path.mkdir(parents=True, exist_ok=True)
# 获取CSV文件
csv_files = list(dir_path.glob("*.csv"))
print(f"找到 {len(csv_files)} 个CSV文件")
# 批量转换
converted_count = 0
for csv_path in csv_files:
try:
# 读取CSV文件
df = pd.read_csv(csv_path, encoding='utf-8')
# 创建Excel文件名
excel_name = csv_path.stem + '.xlsx'
excel_path = out_path / excel_name
# 写入Excel文件
df.to_excel(excel_path, index=False)
print(f"转换完成: {csv_path.name} -> {excel_name}")
converted_count += 1
except Exception as e:
print(f"转换失败 {csv_path.name}: {e}")
print(f"批量转换完成! 共转换了 {converted_count} 个文件")
# 使用示例
if __name__ == "__main__":
batch_convert_csv_to_excel("C:\\Users\\User\\Data", "C:\\Users\\User\\ExcelData")八、高级文件批量处理技巧
1. 递归处理子目录
from pathlib import Path
def batch_process_recursive(directory, process_func, extension=".txt"):
"""
递归处理目录及其子目录中的文件
参数:
directory: 目标目录
process_func: 处理文件的函数
extension: 文件扩展名(默认为.txt)
"""
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 递归获取所有匹配的文件
files = list(dir_path.rglob(f"*{extension}"))
print(f"找到 {len(files)} 个 {extension} 文件(包括子目录)")
# 批量处理
for file_path in files:
try:
process_func(file_path)
print(f"处理完成: {file_path}")
except Exception as e:
print(f"处理失败 {file_path}: {e}")
# 使用示例
if __name__ == "__main__":
# 定义一个简单的处理函数
def process_file(file_path):
# 这里可以添加任意处理逻辑
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 例如,统计文件行数
line_count = len(content.splitlines())
print(f"文件 {file_path.name} 有 {line_count} 行")
# 递归处理
batch_process_recursive("C:\\Users\\User\\Documents", process_file, ".txt")2. 并行处理文件
import concurrent.futures
from pathlib import Path
def batch_process_parallel(directory, process_func, extension=".txt", max_workers=None):
"""
并行处理文件,提高处理速度
参数:
directory: 目标目录
process_func: 处理文件的函数
extension: 文件扩展名(默认为.txt)
max_workers: 最大工作线程数
"""
dir_path = Path(directory)
if not dir_path.exists() or not dir_path.is_dir():
print(f"目录不存在或不是目录: {directory}")
return
# 获取文件
files = list(dir_path.glob(f"*{extension}"))
print(f"找到 {len(files)} 个 {extension} 文件")
# 使用线程池并行处理
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交任务
future_to_file = {executor.submit(process_func, file_path): file_path for file_path in files}
# 处理结果
for future in concurrent.futures.as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
print(f"处理完成: {file_path.name}")
except Exception as e:
print(f"处理失败 {file_path.name}: {e}")
print("并行处理完成!")
# 使用示例
if __name__ == "__main__":
# 定义一个耗时的处理函数
def process_file(file_path):
import time
time.sleep(1) # 模拟耗时操作
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return len(content)
# 并行处理
batch_process_parallel("C:\\Users\\User\\Documents", process_file, ".txt", max_workers=4)九、总结
本集我们学习了如何使用Python进行各种文件批量处理操作,包括:
- 文件路径操作
- 文件批量重命名
- 文件批量移动和复制
- 文件批量内容处理
- 文件批量压缩和解压缩
- 文件批量转换格式
- 高级文件批量处理技巧
通过这些技术,我们可以轻松地处理各种文件批量操作需求,提高工作效率,减少重复劳动。在实际应用中,我们可以根据具体需求组合使用这些技术,实现更复杂的文件批量处理功能。
在下一集中,我们将学习图片批量处理的相关知识,包括图片调整大小、转换格式、添加水印等操作。