欢迎光临千百叶网
详情描述
Python 文件整理自动化工具

以下是一个完整的Python文件整理自动化工具,包含多种常见文件整理操作:

import os
import shutil
import re
import datetime
import hashlib
import logging
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Callable
import zipfile
import json
from dataclasses import dataclass
from enum import Enum
import mimetypes

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class FileCategory(Enum):
    """文件分类枚举"""
    DOCUMENT = "文档"
    IMAGE = "图片"
    VIDEO = "视频"
    AUDIO = "音频"
    ARCHIVE = "压缩包"
    CODE = "代码"
    EXECUTABLE = "可执行文件"
    DATA = "数据文件"
    OTHER = "其他"

@dataclass
class FileInfo:
    """文件信息类"""
    path: Path
    size: int
    modified_time: datetime.datetime
    category: FileCategory
    extension: str

class FileOrganizer:
    """文件整理器主类"""

    # 文件扩展名分类映射
    CATEGORY_MAP = {
        FileCategory.DOCUMENT: ['.pdf', '.doc', '.docx', '.txt', '.rtf', '.odt', 
                               '.xls', '.xlsx', '.ppt', '.pptx', '.md', '.csv'],
        FileCategory.IMAGE: ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', 
                            '.tiff', '.webp', '.ico', '.psd', '.ai'],
        FileCategory.VIDEO: ['.mp4', '.avi', '.mov', '.wmv', '.flv', '.mkv', 
                           '.webm', '.m4v', '.mpg', '.mpeg'],
        FileCategory.AUDIO: ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma', 
                           '.m4a', '.opus'],
        FileCategory.ARCHIVE: ['.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', 
                              '.xz', '.tgz'],
        FileCategory.CODE: ['.py', '.js', '.java', '.cpp', '.c', '.html', '.css', 
                          '.php', '.rb', '.go', '.rs', '.ts', '.json', '.xml'],
        FileCategory.EXECUTABLE: ['.exe', '.msi', '.app', '.bat', '.sh', '.deb', 
                                 '.rpm', '.dmg'],
        FileCategory.DATA: ['.db', '.sqlite', '.sql', '.json', '.yaml', '.yml', 
                          '.toml', '.ini', '.cfg']
    }

    def __init__(self, base_path: str):
        self.base_path = Path(base_path).resolve()
        if not self.base_path.exists():
            raise FileNotFoundError(f"路径不存在: {self.base_path}")

        logger.info(f"初始化文件整理器,基础路径: {self.base_path}")

    def get_file_category(self, file_path: Path) -> FileCategory:
        """根据文件扩展名确定文件类别"""
        extension = file_path.suffix.lower()

        for category, extensions in self.CATEGORY_MAP.items():
            if extension in extensions:
                return category

        # 尝试通过MIME类型判断
        mime_type, _ = mimetypes.guess_type(str(file_path))
        if mime_type:
            if mime_type.startswith('image/'):
                return FileCategory.IMAGE
            elif mime_type.startswith('video/'):
                return FileCategory.VIDEO
            elif mime_type.startswith('audio/'):
                return FileCategory.AUDIO
            elif mime_type.startswith('text/'):
                return FileCategory.DOCUMENT

        return FileCategory.OTHER

    def organize_by_type(self, target_dir: str = None) -> Dict[FileCategory, List[str]]:
        """
        按文件类型整理文件

        Args:
            target_dir: 目标目录,默认为基础路径

        Returns:
            整理结果字典
        """
        if target_dir is None:
            target_dir = self.base_path
        else:
            target_dir = Path(target_dir)

        if not target_dir.exists():
            target_dir.mkdir(parents=True, exist_ok=True)

        result = {category: [] for category in FileCategory}

        # 创建分类文件夹
        for category in FileCategory:
            category_dir = target_dir / category.value
            category_dir.mkdir(exist_ok=True)

        # 遍历并整理文件
        for item in self.base_path.iterdir():
            if item.is_file():
                try:
                    category = self.get_file_category(item)
                    dest_dir = target_dir / category.value

                    # 处理同名文件
                    dest_path = self._get_unique_path(dest_dir / item.name)

                    # 移动文件
                    shutil.move(str(item), str(dest_path))
                    result[category].append(str(dest_path))
                    logger.info(f"移动文件: {item.name} -> {category.value}/")

                except Exception as e:
                    logger.error(f"处理文件 {item} 时出错: {e}")

        logger.info(f"按类型整理完成,共处理 {sum(len(files) for files in result.values())} 个文件")
        return result

    def organize_by_date(self, date_format: str = "%Y-%m", 
                        target_dir: str = None) -> Dict[str, List[str]]:
        """
        按修改日期整理文件

        Args:
            date_format: 日期格式,如 "%Y-%m" (年-月) 或 "%Y-%m-%d" (年-月-日)
            target_dir: 目标目录

        Returns:
            整理结果字典
        """
        if target_dir is None:
            target_dir = self.base_path
        else:
            target_dir = Path(target_dir)

        if not target_dir.exists():
            target_dir.mkdir(parents=True, exist_ok=True)

        result = {}

        for item in self.base_path.iterdir():
            if item.is_file():
                try:
                    # 获取文件修改时间
                    mtime = datetime.datetime.fromtimestamp(item.stat().st_mtime)
                    date_folder = mtime.strftime(date_format)

                    # 创建日期文件夹
                    dest_dir = target_dir / date_folder
                    dest_dir.mkdir(exist_ok=True)

                    # 处理同名文件
                    dest_path = self._get_unique_path(dest_dir / item.name)

                    # 移动文件
                    shutil.move(str(item), str(dest_path))

                    # 记录结果
                    if date_folder not in result:
                        result[date_folder] = []
                    result[date_folder].append(str(dest_path))

                    logger.info(f"移动文件: {item.name} -> {date_folder}/")

                except Exception as e:
                    logger.error(f"处理文件 {item} 时出错: {e}")

        logger.info(f"按日期整理完成,共处理 {sum(len(files) for files in result.values())} 个文件")
        return result

    def organize_by_extension(self, target_dir: str = None) -> Dict[str, List[str]]:
        """
        按文件扩展名整理文件

        Args:
            target_dir: 目标目录

        Returns:
            整理结果字典
        """
        if target_dir is None:
            target_dir = self.base_path
        else:
            target_dir = Path(target_dir)

        if not target_dir.exists():
            target_dir.mkdir(parents=True, exist_ok=True)

        result = {}

        for item in self.base_path.iterdir():
            if item.is_file():
                try:
                    extension = item.suffix.lower()
                    if not extension:  # 无扩展名的文件
                        extension = "no_extension"
                    else:
                        extension = extension[1:]  # 去掉点号

                    # 创建扩展名文件夹
                    dest_dir = target_dir / extension
                    dest_dir.mkdir(exist_ok=True)

                    # 处理同名文件
                    dest_path = self._get_unique_path(dest_dir / item.name)

                    # 移动文件
                    shutil.move(str(item), str(dest_path))

                    # 记录结果
                    if extension not in result:
                        result[extension] = []
                    result[extension].append(str(dest_path))

                    logger.info(f"移动文件: {item.name} -> {extension}/")

                except Exception as e:
                    logger.error(f"处理文件 {item} 时出错: {e}")

        logger.info(f"按扩展名整理完成,共处理 {sum(len(files) for files in result.values())} 个文件")
        return result

    def find_duplicate_files(self, algorithm: str = "md5") -> Dict[str, List[str]]:
        """
        查找重复文件

        Args:
            algorithm: 哈希算法,可选 'md5', 'sha1', 'sha256'

        Returns:
            重复文件字典,键为文件哈希,值为文件路径列表
        """
        hash_dict = {}

        # 支持的哈希算法
        hash_functions = {
            'md5': hashlib.md5,
            'sha1': hashlib.sha1,
            'sha256': hashlib.sha256
        }

        if algorithm not in hash_functions:
            raise ValueError(f"不支持的哈希算法: {algorithm}")

        hash_func = hash_functions[algorithm]

        for root, _, files in os.walk(self.base_path):
            for file in files:
                file_path = Path(root) / file
                try:
                    file_hash = self._calculate_hash(file_path, hash_func)

                    if file_hash not in hash_dict:
                        hash_dict[file_hash] = []
                    hash_dict[file_hash].append(str(file_path))

                except Exception as e:
                    logger.error(f"计算文件哈希时出错 {file_path}: {e}")

        # 筛选出重复的文件
        duplicates = {h: paths for h, paths in hash_dict.items() if len(paths) > 1}

        logger.info(f"找到 {len(duplicates)} 组重复文件")
        return duplicates

    def remove_duplicates(self, keep_oldest: bool = True) -> List[str]:
        """
        删除重复文件

        Args:
            keep_oldest: 是否保留最旧的文件,否则保留最新的

        Returns:
            被删除的文件列表
        """
        duplicates = self.find_duplicate_files()
        deleted_files = []

        for file_hash, file_paths in duplicates.items():
            # 将路径转换为Path对象并获取文件信息
            file_info_list = []
            for path_str in file_paths:
                path = Path(path_str)
                mtime = datetime.datetime.fromtimestamp(path.stat().st_mtime)
                file_info_list.append((path, mtime))

            # 排序:根据keep_oldest决定保留哪个文件
            file_info_list.sort(key=lambda x: x[1], reverse=not keep_oldest)

            # 保留第一个,删除其余的
            to_keep = file_info_list[0][0]
            to_delete = file_info_list[1:]

            for path, _ in to_delete:
                try:
                    path.unlink()
                    deleted_files.append(str(path))
                    logger.info(f"删除重复文件: {path.name} (保留: {to_keep.name})")
                except Exception as e:
                    logger.error(f"删除文件 {path} 时出错: {e}")

        logger.info(f"删除了 {len(deleted_files)} 个重复文件")
        return deleted_files

    def rename_files(self, pattern: str, start_number: int = 1, 
                    padding: int = 3) -> List[Tuple[str, str]]:
        """
        批量重命名文件

        Args:
            pattern: 文件名模式,可使用 {num} 作为序号占位符
                    如: "文档_{num:03d}.txt" 或 "图片_{num}.jpg"
            start_number: 起始序号
            padding: 序号填充位数

        Returns:
            重命名结果列表,每个元素为 (原文件名, 新文件名)
        """
        renamed_files = []
        counter = start_number

        # 获取所有文件并排序
        files = sorted([f for f in self.base_path.iterdir() if f.is_file()], 
                      key=lambda x: x.name.lower())

        for file_path in files:
            try:
                # 生成新文件名
                if "{num" in pattern:
                    # 使用格式化字符串
                    new_name = pattern.format(num=counter)
                else:
                    # 简单替换 {num}
                    new_name = pattern.replace("{num}", str(counter).zfill(padding))

                # 确保新文件名有扩展名
                if file_path.suffix and not new_name.endswith(file_path.suffix):
                    new_name += file_path.suffix

                new_path = self.base_path / new_name
                new_path = self._get_unique_path(new_path)

                # 重命名文件
                file_path.rename(new_path)
                renamed_files.append((file_path.name, new_path.name))
                logger.info(f"重命名: {file_path.name} -> {new_path.name}")

                counter += 1

            except Exception as e:
                logger.error(f"重命名文件 {file_path} 时出错: {e}")

        logger.info(f"批量重命名完成,共处理 {len(renamed_files)} 个文件")
        return renamed_files

    def cleanup_empty_dirs(self) -> List[str]:
        """
        清理空文件夹

        Returns:
            被删除的空文件夹列表
        """
        empty_dirs = []

        for root, dirs, files in os.walk(self.base_path, topdown=False):
            for dir_name in dirs:
                dir_path = Path(root) / dir_name
                try:
                    # 检查目录是否为空
                    if not any(dir_path.iterdir()):
                        dir_path.rmdir()
                        empty_dirs.append(str(dir_path))
                        logger.info(f"删除空文件夹: {dir_path}")
                except Exception as e:
                    logger.error(f"删除文件夹 {dir_path} 时出错: {e}")

        logger.info(f"清理了 {len(empty_dirs)} 个空文件夹")
        return empty_dirs

    def archive_files(self, archive_name: str, 
                     file_patterns: List[str] = None,
                     compression: str = "zip") -> str:
        """
        压缩文件

        Args:
            archive_name: 压缩包名称
            file_patterns: 文件模式列表,如 ["*.txt", "*.jpg"]
            compression: 压缩格式,支持 'zip', 'tar', 'gztar', 'bztar', 'xztar'

        Returns:
            压缩包路径
        """
        if not archive_name.endswith(f'.{compression}'):
            archive_name += f'.{compression}'

        archive_path = self.base_path / archive_name

        if file_patterns:
            # 收集匹配的文件
            files_to_archive = []
            for pattern in file_patterns:
                files_to_archive.extend(self.base_path.glob(pattern))
        else:
            # 压缩所有文件
            files_to_archive = [f for f in self.base_path.iterdir() if f.is_file()]

        if compression == "zip":
            with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for file_path in files_to_archive:
                    zipf.write(file_path, file_path.name)
                    logger.info(f"添加文件到压缩包: {file_path.name}")
        else:
            # 使用shutil进行其他格式的压缩
            import tempfile
            with tempfile.TemporaryDirectory() as temp_dir:
                temp_dir_path = Path(temp_dir)
                for file_path in files_to_archive:
                    shutil.copy(file_path, temp_dir_path / file_path.name)

                shutil.make_archive(
                    str(archive_path.with_suffix('')),
                    compression,
                    temp_dir_path
                )

        logger.info(f"创建压缩包: {archive_path}")
        return str(archive_path)

    def generate_report(self, output_file: str = "file_report.json") -> Dict:
        """
        生成文件整理报告

        Args:
            output_file: 输出报告文件名

        Returns:
            报告数据字典
        """
        report = {
            "base_path": str(self.base_path),
            "generated_at": datetime.datetime.now().isoformat(),
            "total_files": 0,
            "total_size_bytes": 0,
            "categories": {},
            "extensions": {},
            "largest_files": [],
            "oldest_files": [],
            "newest_files": []
        }

        all_files = []

        for root, dirs, files in os.walk(self.base_path):
            for file in files:
                file_path = Path(root) / file
                try:
                    stat = file_path.stat()

                    file_info = {
                        "path": str(file_path),
                        "name": file,
                        "size": stat.st_size,
                        "modified": datetime.datetime.fromtimestamp(stat.st_mtime).isoformat(),
                        "extension": file_path.suffix.lower(),
                        "category": self.get_file_category(file_path).value
                    }

                    all_files.append(file_info)
                    report["total_files"] += 1
                    report["total_size_bytes"] += stat.st_size

                    # 统计类别
                    category = file_info["category"]
                    report["categories"][category] = report["categories"].get(category, 0) + 1

                    # 统计扩展名
                    extension = file_info["extension"] if file_info["extension"] else "no_extension"
                    report["extensions"][extension] = report["extensions"].get(extension, 0) + 1

                except Exception as e:
                    logger.error(f"统计文件 {file_path} 时出错: {e}")

        # 找出最大文件
        if all_files:
            largest_files = sorted(all_files, key=lambda x: x["size"], reverse=True)[:10]
            report["largest_files"] = largest_files

            # 找出最旧和最新文件
            oldest_files = sorted(all_files, key=lambda x: x["modified"])[:10]
            newest_files = sorted(all_files, key=lambda x: x["modified"], reverse=True)[:10]
            report["oldest_files"] = oldest_files
            report["newest_files"] = newest_files

        # 保存报告到文件
        report_path = self.base_path / output_file
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)

        logger.info(f"生成报告: {report_path}")
        return report

    def _calculate_hash(self, file_path: Path, hash_func) -> str:
        """计算文件哈希值"""
        h = hash_func()
        with open(file_path, 'rb') as f:
            # 分块读取大文件
            for chunk in iter(lambda: f.read(4096), b""):
                h.update(chunk)
        return h.hexdigest()

    def _get_unique_path(self, path: Path) -> Path:
        """获取唯一文件路径,避免文件名冲突"""
        if not path.exists():
            return path

        counter = 1
        while True:
            new_path = path.with_name(f"{path.stem}_{counter}{path.suffix}")
            if not new_path.exists():
                return new_path
            counter += 1

# 使用示例
def main():
    """使用示例"""
    # 设置工作目录
    base_dir = "./test_files"

    # 创建测试目录和文件
    def create_test_files():
        """创建测试文件"""
        import random
        import string

        test_dir = Path(base_dir)
        test_dir.mkdir(exist_ok=True)

        # 创建一些测试文件
        extensions = ['.txt', '.jpg', '.png', '.pdf', '.mp3', '.mp4', '.py']

        for i in range(20):
            ext = random.choice(extensions)
            filename = f"test_file_{i}{ext}"
            filepath = test_dir / filename

            # 创建文件并写入一些内容
            with open(filepath, 'w') as f:
                content = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
                f.write(content)

            # 随机修改文件时间
            random_time = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 365))
            timestamp = random_time.timestamp()
            os.utime(filepath, (timestamp, timestamp))

        print(f"创建了20个测试文件在 {test_dir}")

    # 创建测试文件
    create_test_files()

    try:
        # 初始化文件整理器
        organizer = FileOrganizer(base_dir)

        print("\n1. 按类型整理文件")
        type_result = organizer.organize_by_type()

        print("\n2. 按日期整理文件")
        # 先将文件移回根目录以便演示
        for category_dir in Path(base_dir).iterdir():
            if category_dir.is_dir():
                for file in category_dir.iterdir():
                    if file.is_file():
                        shutil.move(str(file), base_dir)
                category_dir.rmdir()

        date_result = organizer.organize_by_date(date_format="%Y-%m")

        print("\n3. 查找重复文件")
        # 创建一些重复文件
        for i in range(3):
            for file in Path(base_dir).iterdir():
                if file.is_file():
                    shutil.copy(file, file.parent / f"{file.stem}_copy{i}{file.suffix}")

        duplicates = organizer.find_duplicate_files()
        print(f"找到 {len(duplicates)} 组重复文件")

        print("\n4. 删除重复文件")
        deleted = organizer.remove_duplicates()
        print(f"删除了 {len(deleted)} 个重复文件")

        print("\n5. 批量重命名文件")
        # 先将文件移回根目录
        for date_dir in Path(base_dir).iterdir():
            if date_dir.is_dir():
                for file in date_dir.iterdir():
                    if file.is_file():
                        shutil.move(str(file), base_dir)
                date_dir.rmdir()

        rename_result = organizer.rename_files("文档_{num:03d}", start_number=1)
        print(f"重命名了 {len(rename_result)} 个文件")

        print("\n6. 清理空文件夹")
        empty_dirs = organizer.cleanup_empty_dirs()
        print(f"清理了 {len(empty_dirs)} 个空文件夹")

        print("\n7. 生成报告")
        report = organizer.generate_report("file_organization_report.json")
        print(f"总文件数: {report['total_files']}")
        print(f"总大小: {report['total_size_bytes'] / 1024 / 1024:.2f} MB")

        print("\n8. 创建压缩包")
        archive_path = organizer.archive_files("备份文件", ["*.txt", "*.pdf"], "zip")
        print(f"创建了压缩包: {archive_path}")

    except Exception as e:
        print(f"发生错误: {e}")

if __name__ == "__main__":
    main()

功能说明

这个文件整理自动化工具提供了以下功能:

1. 按类型整理文件

  • 将文件按文档、图片、视频、音频等类别分类
  • 自动创建分类文件夹并移动文件

2. 按日期整理文件

  • 按文件修改日期整理(年-月、年-月-日等格式)
  • 自动创建日期文件夹

3. 按扩展名整理文件

  • 按文件扩展名分类整理

4. 查找和删除重复文件

  • 使用哈希算法(MD5/SHA1/SHA256)识别重复文件
  • 可自动删除重复文件,保留最早或最新的版本

5. 批量重命名文件

  • 支持自定义命名模式
  • 自动序号填充

6. 清理空文件夹

  • 自动识别并删除空文件夹

7. 文件压缩

  • 支持多种压缩格式(zip, tar, gztar等)
  • 可选择性压缩特定类型的文件

8. 生成整理报告

  • 统计文件数量、大小
  • 分析文件类型分布
  • 识别最大/最旧/最新文件
  • 生成JSON格式报告

使用示例

# 基本使用
organizer = FileOrganizer("你的文件夹路径")

# 按类型整理
organizer.organize_by_type()

# 按日期整理
organizer.organize_by_date(date_format="%Y-%m-%d")

# 查找重复文件
duplicates = organizer.find_duplicate_files()

# 批量重命名
organizer.rename_files("照片_{num:03d}", start_number=1)

# 生成报告
report = organizer.generate_report()

特点

安全可靠:使用唯一文件名避免覆盖,提供详细日志 灵活配置:支持多种整理方式和参数自定义 错误处理:完善的异常处理和日志记录 类型识别:基于扩展名和MIME类型双重判断 可扩展:模块化设计,易于添加新功能

你可以根据实际需求修改和扩展这个工具,例如添加自定义分类规则、支持更多文件操作等。