第162集_文件批量处理

一、文件批量处理概述

文件批量处理是Python自动化中最常见的应用场景之一,它可以帮助我们快速、高效地处理大量文件,避免重复的手动操作。在本集中,我们将学习如何使用Python进行各种文件批量处理操作。

1. 文件批量处理的常见需求

  • 批量重命名文件
  • 批量移动或复制文件
  • 批量转换文件格式
  • 批量处理文件内容
  • 批量压缩或解压缩文件
  • 批量创建或删除文件/目录

2. 常用Python模块

  • os:提供操作系统相关功能,如文件路径操作、文件重命名等
  • shutil:提供高级文件操作,如复制、移动、删除等
  • glob:用于匹配文件路径
  • pathlib:提供面向对象的路径操作
  • fnmatch:用于文件名匹配

二、文件路径操作

在进行文件批量处理之前,我们需要先了解如何操作文件路径。

1. 使用os模块

import os

# 获取当前工作目录
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")

# 拼接路径
file_path = os.path.join(current_dir, 'data', 'file.txt')
print(f"拼接后的路径: {file_path}")

# 获取目录名和文件名
dir_name = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
print(f"目录名: {dir_name}")
print(f"文件名: {file_name}")

# 获取文件名和扩展名
name, ext = os.path.splitext(file_name)
print(f"文件名(不含扩展名): {name}")
print(f"扩展名: {ext}")

# 检查路径是否存在
if os.path.exists(file_path):
    print(f"路径存在: {file_path}")
else:
    print(f"路径不存在: {file_path}")

# 检查是否是文件或目录
print(f"是否是文件: {os.path.isfile(file_path)}")
print(f"是否是目录: {os.path.isdir(dir_name)}")

2. 使用pathlib模块(推荐)

from pathlib import Path

# 创建Path对象
file_path = Path('data', 'file.txt')

# 获取绝对路径
abs_path = file_path.absolute()
print(f"绝对路径: {abs_path}")

# 获取目录和文件名
dir_path = file_path.parent
file_name = file_path.name
print(f"目录路径: {dir_path}")
print(f"文件名: {file_name}")

# 获取文件名和扩展名
stem = file_path.stem
suffix = file_path.suffix
print(f"文件名(不含扩展名): {stem}")
print(f"扩展名: {suffix}")

# 检查路径是否存在
if file_path.exists():
    print(f"路径存在: {file_path}")

# 检查是否是文件或目录
print(f"是否是文件: {file_path.is_file()}")
print(f"是否是目录: {file_path.is_dir()}")

三、文件批量重命名

1. 按序号批量重命名

import os
from pathlib import Path

def batch_rename_files(directory, prefix, extension):
    """
    按序号批量重命名文件
    
    参数:
    directory: 目标目录
    prefix: 新文件名前缀
    extension: 文件扩展名(包含点号,如'.txt')
    """
    # 转换为Path对象
    dir_path = Path(directory)
    
    # 检查目录是否存在
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取指定扩展名的文件
    files = list(dir_path.glob(f"*{extension}"))
    files.sort()  # 按文件名排序
    
    print(f"找到 {len(files)} 个 {extension} 文件")
    
    # 批量重命名
    for i, file_path in enumerate(files, 1):
        # 构建新文件名
        new_name = f"{prefix}_{i:03d}{extension}"
        new_path = dir_path / new_name
        
        # 重命名文件
        file_path.rename(new_path)
        print(f"重命名: {file_path.name} -> {new_name}")
    
    print("批量重命名完成!")

# 使用示例
if __name__ == "__main__":
    batch_rename_files("C:\\Users\\User\\Photos", "vacation", ".jpg")

2. 按规则替换文件名

from pathlib import Path

def batch_rename_replace(directory, old_str, new_str, extension="*"):
    """
    按规则替换文件名中的字符串
    
    参数:
    directory: 目标目录
    old_str: 要替换的字符串
    new_str: 替换后的字符串
    extension: 文件扩展名(默认为所有文件)
    """
    dir_path = Path(directory)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
    files = list(dir_path.glob(pattern))
    
    print(f"找到 {len(files)} 个匹配文件")
    
    # 批量重命名
    renamed_count = 0
    for file_path in files:
        if old_str in file_path.name:
            # 替换字符串
            new_name = file_path.name.replace(old_str, new_str)
            new_path = dir_path / new_name
            
            # 重命名文件
            file_path.rename(new_path)
            print(f"重命名: {file_path.name} -> {new_name}")
            renamed_count += 1
    
    print(f"批量重命名完成! 共重命名了 {renamed_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_rename_replace("C:\\Users\\User\\Documents", "old", "new", ".txt")

四、文件批量移动和复制

1. 批量移动文件

import os
import shutil
from pathlib import Path

def batch_move_files(source_dir, dest_dir, extension="*"):
    """
    批量移动文件到指定目录
    
    参数:
    source_dir: 源目录
    dest_dir: 目标目录
    extension: 文件扩展名(默认为所有文件)
    """
    source_path = Path(source_dir)
    dest_path = Path(dest_dir)
    
    # 检查源目录
    if not source_path.exists() or not source_path.is_dir():
        print(f"源目录不存在或不是目录: {source_dir}")
        return
    
    # 创建目标目录(如果不存在)
    dest_path.mkdir(parents=True, exist_ok=True)
    
    # 获取文件
    pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
    files = list(source_path.glob(pattern))
    
    print(f"找到 {len(files)} 个匹配文件")
    
    # 批量移动
    moved_count = 0
    for file_path in files:
        # 目标文件路径
        dest_file_path = dest_path / file_path.name
        
        # 检查目标文件是否已存在
        if dest_file_path.exists():
            print(f"目标文件已存在,跳过: {dest_file_path.name}")
            continue
            
        # 移动文件
        shutil.move(str(file_path), str(dest_file_path))
        print(f"移动: {file_path.name} -> {dest_dir}")
        moved_count += 1
    
    print(f"批量移动完成! 共移动了 {moved_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_move_files("C:\\Users\\User\\Downloads", "C:\\Users\\User\\Documents\\Reports", ".pdf")

2. 批量复制文件

from pathlib import Path
import shutil

def batch_copy_files(source_dir, dest_dir, extension="*"):
    """
    批量复制文件到指定目录
    
    参数:
    source_dir: 源目录
    dest_dir: 目标目录
    extension: 文件扩展名(默认为所有文件)
    """
    source_path = Path(source_dir)
    dest_path = Path(dest_dir)
    
    # 检查源目录
    if not source_path.exists() or not source_path.is_dir():
        print(f"源目录不存在或不是目录: {source_dir}")
        return
    
    # 创建目标目录(如果不存在)
    dest_path.mkdir(parents=True, exist_ok=True)
    
    # 获取文件
    pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
    files = list(source_path.glob(pattern))
    
    print(f"找到 {len(files)} 个匹配文件")
    
    # 批量复制
    copied_count = 0
    for file_path in files:
        # 目标文件路径
        dest_file_path = dest_path / file_path.name
        
        # 复制文件(保留元数据)
        shutil.copy2(str(file_path), str(dest_file_path))
        print(f"复制: {file_path.name} -> {dest_dir}")
        copied_count += 1
    
    print(f"批量复制完成! 共复制了 {copied_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_copy_files("C:\\Users\\User\\Photos", "D:\\Backup\\Photos", ".jpg")

五、文件批量内容处理

1. 批量替换文件内容

from pathlib import Path

def batch_replace_content(directory, old_str, new_str, extension=".txt"):
    """
    批量替换文件内容
    
    参数:
    directory: 目标目录
    old_str: 要替换的字符串
    new_str: 替换后的字符串
    extension: 文件扩展名(默认为.txt)
    """
    dir_path = Path(directory)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    files = list(dir_path.glob(f"*{extension}"))
    
    print(f"找到 {len(files)} 个 {extension} 文件")
    
    # 批量替换内容
    processed_count = 0
    for file_path in files:
        try:
            # 读取文件内容
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # 检查是否需要替换
            if old_str not in content:
                continue
            
            # 替换内容
            new_content = content.replace(old_str, new_str)
            
            # 写回文件
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(new_content)
            
            print(f"处理完成: {file_path.name}")
            processed_count += 1
            
        except Exception as e:
            print(f"处理失败 {file_path.name}: {e}")
    
    print(f"批量替换完成! 共处理了 {processed_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_replace_content("C:\\Users\\User\\Documents", "old_company", "new_company", ".txt")

2. 批量提取文件内容

import re
from pathlib import Path

def batch_extract_content(directory, pattern, output_file, extension=".txt"):
    """
    批量提取文件内容并保存到一个文件
    
    参数:
    directory: 目标目录
    pattern: 正则表达式模式
    output_file: 输出文件路径
    extension: 文件扩展名(默认为.txt)
    """
    dir_path = Path(directory)
    output_path = Path(output_file)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    files = list(dir_path.glob(f"*{extension}"))
    
    print(f"找到 {len(files)} 个 {extension} 文件")
    
    # 编译正则表达式
    regex = re.compile(pattern)
    
    # 创建输出文件
    with open(output_path, 'w', encoding='utf-8') as out_f:
        for file_path in files:
            try:
                # 读取文件内容
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # 提取匹配内容
                matches = regex.findall(content)
                
                if matches:
                    out_f.write(f"=== {file_path.name} ===\n")
                    for match in matches:
                        out_f.write(f"{match}\n")
                    out_f.write("\n")
                    print(f"提取完成: {file_path.name}")
                    
            except Exception as e:
                print(f"提取失败 {file_path.name}: {e}")
    
    print(f"批量提取完成! 结果保存到: {output_file}")

# 使用示例
if __name__ == "__main__":
    # 提取所有邮箱地址
    pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    batch_extract_content("C:\\Users\\User\\Documents", pattern, "extracted_emails.txt")

六、文件批量压缩和解压缩

1. 批量压缩文件

import zipfile
from pathlib import Path

def batch_compress_files(directory, zip_name, extension="*"):
    """
    批量压缩文件到ZIP文件
    
    参数:
    directory: 目标目录
    zip_name: ZIP文件名(包含路径)
    extension: 文件扩展名(默认为所有文件)
    """
    dir_path = Path(directory)
    zip_path = Path(zip_name)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    pattern = f"*{extension}" if extension.startswith(".") else f"*.{extension}"
    files = list(dir_path.glob(pattern))
    
    print(f"找到 {len(files)} 个匹配文件")
    
    # 创建ZIP文件
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for file_path in files:
            # 将文件添加到ZIP中,保留相对路径
            arcname = file_path.relative_to(dir_path.parent)
            zipf.write(file_path, arcname)
            print(f"添加到ZIP: {file_path.name}")
    
    print(f"批量压缩完成! ZIP文件: {zip_path}")
    print(f"ZIP文件大小: {zip_path.stat().st_size / 1024:.2f} KB")

# 使用示例
if __name__ == "__main__":
    batch_compress_files("C:\\Users\\User\\Photos", "D:\\Backup\\photos_backup.zip", ".jpg")

2. 批量解压缩文件

import zipfile
import os
from pathlib import Path

def batch_extract_zip(zip_files, dest_dir):
    """
    批量解压缩ZIP文件
    
    参数:
    zip_files: ZIP文件列表或包含ZIP文件的目录
    dest_dir: 目标目录
    """
    dest_path = Path(dest_dir)
    
    # 创建目标目录
    dest_path.mkdir(parents=True, exist_ok=True)
    
    # 收集所有ZIP文件
    zip_list = []
    
    if isinstance(zip_files, list):
        # 如果是列表,直接使用
        zip_list = zip_files
    else:
        # 如果是目录,获取目录中的所有ZIP文件
        zip_dir = Path(zip_files)
        if zip_dir.exists() and zip_dir.is_dir():
            zip_list = list(zip_dir.glob("*.zip"))
    
    print(f"找到 {len(zip_list)} 个ZIP文件")
    
    # 批量解压缩
    for zip_path in zip_list:
        if not zip_path.exists() or not zip_path.is_file():
            print(f"跳过: {zip_path} 不是有效文件")
            continue
        
        try:
            with zipfile.ZipFile(zip_path, 'r') as zipf:
                # 获取ZIP文件中的所有文件
                zip_contents = zipf.namelist()
                print(f"解压缩: {zip_path.name} (包含 {len(zip_contents)} 个文件)")
                
                # 解压缩到目标目录
                zipf.extractall(dest_path)
                
        except zipfile.BadZipFile:
            print(f"跳过: {zip_path.name} 不是有效的ZIP文件")
        except Exception as e:
            print(f"解压缩失败 {zip_path.name}: {e}")
    
    print(f"批量解压缩完成! 文件保存到: {dest_path}")

# 使用示例
if __name__ == "__main__":
    # 解压缩单个ZIP文件
    # batch_extract_zip(["D:\\Backup\\photos_backup.zip"], "D:\\Extracted")
    
    # 解压缩目录中的所有ZIP文件
    batch_extract_zip("D:\\Backup", "D:\\Extracted")

七、文件批量转换格式

1. 批量转换文本编码

from pathlib import Path

def batch_convert_encoding(directory, from_encoding, to_encoding, extension=".txt"):
    """
    批量转换文件编码
    
    参数:
    directory: 目标目录
    from_encoding: 源编码
    to_encoding: 目标编码
    extension: 文件扩展名(默认为.txt)
    """
    dir_path = Path(directory)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    files = list(dir_path.glob(f"*{extension}"))
    
    print(f"找到 {len(files)} 个 {extension} 文件")
    
    # 批量转换编码
    converted_count = 0
    for file_path in files:
        try:
            # 读取源文件
            with open(file_path, 'r', encoding=from_encoding) as f:
                content = f.read()
            
            # 写回新编码
            with open(file_path, 'w', encoding=to_encoding) as f:
                f.write(content)
            
            print(f"转换完成: {file_path.name}")
            converted_count += 1
            
        except UnicodeDecodeError:
            print(f"解码失败 {file_path.name}: 可能不是 {from_encoding} 编码")
        except Exception as e:
            print(f"转换失败 {file_path.name}: {e}")
    
    print(f"批量转换完成! 共转换了 {converted_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_convert_encoding("C:\\Users\\User\\Documents", "gbk", "utf-8", ".txt")

2. 批量转换CSV到Excel

import pandas as pd
from pathlib import Path

def batch_convert_csv_to_excel(directory, output_dir):
    """
    批量将CSV文件转换为Excel文件
    
    参数:
    directory: 包含CSV文件的目录
    output_dir: 输出Excel文件的目录
    """
    dir_path = Path(directory)
    out_path = Path(output_dir)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 创建输出目录
    out_path.mkdir(parents=True, exist_ok=True)
    
    # 获取CSV文件
    csv_files = list(dir_path.glob("*.csv"))
    
    print(f"找到 {len(csv_files)} 个CSV文件")
    
    # 批量转换
    converted_count = 0
    for csv_path in csv_files:
        try:
            # 读取CSV文件
            df = pd.read_csv(csv_path, encoding='utf-8')
            
            # 创建Excel文件名
            excel_name = csv_path.stem + '.xlsx'
            excel_path = out_path / excel_name
            
            # 写入Excel文件
            df.to_excel(excel_path, index=False)
            
            print(f"转换完成: {csv_path.name} -> {excel_name}")
            converted_count += 1
            
        except Exception as e:
            print(f"转换失败 {csv_path.name}: {e}")
    
    print(f"批量转换完成! 共转换了 {converted_count} 个文件")

# 使用示例
if __name__ == "__main__":
    batch_convert_csv_to_excel("C:\\Users\\User\\Data", "C:\\Users\\User\\ExcelData")

八、高级文件批量处理技巧

1. 递归处理子目录

from pathlib import Path

def batch_process_recursive(directory, process_func, extension=".txt"):
    """
    递归处理目录及其子目录中的文件
    
    参数:
    directory: 目标目录
    process_func: 处理文件的函数
    extension: 文件扩展名(默认为.txt)
    """
    dir_path = Path(directory)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 递归获取所有匹配的文件
    files = list(dir_path.rglob(f"*{extension}"))
    
    print(f"找到 {len(files)} 个 {extension} 文件(包括子目录)")
    
    # 批量处理
    for file_path in files:
        try:
            process_func(file_path)
            print(f"处理完成: {file_path}")
        except Exception as e:
            print(f"处理失败 {file_path}: {e}")

# 使用示例
if __name__ == "__main__":
    # 定义一个简单的处理函数
    def process_file(file_path):
        # 这里可以添加任意处理逻辑
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        # 例如,统计文件行数
        line_count = len(content.splitlines())
        print(f"文件 {file_path.name} 有 {line_count} 行")
    
    # 递归处理
    batch_process_recursive("C:\\Users\\User\\Documents", process_file, ".txt")

2. 并行处理文件

import concurrent.futures
from pathlib import Path

def batch_process_parallel(directory, process_func, extension=".txt", max_workers=None):
    """
    并行处理文件,提高处理速度
    
    参数:
    directory: 目标目录
    process_func: 处理文件的函数
    extension: 文件扩展名(默认为.txt)
    max_workers: 最大工作线程数
    """
    dir_path = Path(directory)
    
    if not dir_path.exists() or not dir_path.is_dir():
        print(f"目录不存在或不是目录: {directory}")
        return
    
    # 获取文件
    files = list(dir_path.glob(f"*{extension}"))
    
    print(f"找到 {len(files)} 个 {extension} 文件")
    
    # 使用线程池并行处理
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交任务
        future_to_file = {executor.submit(process_func, file_path): file_path for file_path in files}
        
        # 处理结果
        for future in concurrent.futures.as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                result = future.result()
                print(f"处理完成: {file_path.name}")
            except Exception as e:
                print(f"处理失败 {file_path.name}: {e}")
    
    print("并行处理完成!")

# 使用示例
if __name__ == "__main__":
    # 定义一个耗时的处理函数
    def process_file(file_path):
        import time
        time.sleep(1)  # 模拟耗时操作
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        return len(content)
    
    # 并行处理
    batch_process_parallel("C:\\Users\\User\\Documents", process_file, ".txt", max_workers=4)

九、总结

本集我们学习了如何使用Python进行各种文件批量处理操作,包括:

  1. 文件路径操作
  2. 文件批量重命名
  3. 文件批量移动和复制
  4. 文件批量内容处理
  5. 文件批量压缩和解压缩
  6. 文件批量转换格式
  7. 高级文件批量处理技巧

通过这些技术,我们可以轻松地处理各种文件批量操作需求,提高工作效率,减少重复劳动。在实际应用中,我们可以根据具体需求组合使用这些技术,实现更复杂的文件批量处理功能。

在下一集中,我们将学习图片批量处理的相关知识,包括图片调整大小、转换格式、添加水印等操作。

« 上一篇 自动化概念 下一篇 » 图片批量处理