Skip to content

文件操作

Python 提供了丰富的文件操作功能,包括读写文件、目录操作等。

文件读写

基本读写

python
# 写入文件
with open("example.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.write("Python 文件操作\n")

# 读取整个文件
with open("example.txt", "r", encoding="utf-8") as f:
    content = f.read()
    print(content)

# 按行读取
with open("example.txt", "r", encoding="utf-8") as f:
    for line in f:
        print(line.rstrip())  # 去除末尾换行符

# 读取所有行为列表
with open("example.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
    print(lines)

# 读取指定字节数
with open("example.txt", "r", encoding="utf-8") as f:
    chunk = f.read(10)  # 读取10个字符
    print(chunk)

# 追加写入
with open("example.txt", "a", encoding="utf-8") as f:
    f.write("追加的内容\n")

文件模式

模式说明
r只读(默认)
w只写(覆盖)
a追加
x创建(文件存在则报错)
b二进制模式
t文本模式(默认)
+读写模式
python
# 二进制读写
with open("image.png", "rb") as f:
    data = f.read()

with open("copy.png", "wb") as f:
    f.write(data)

# 读写模式
with open("example.txt", "r+", encoding="utf-8") as f:
    content = f.read()
    f.seek(0)  # 移动到文件开头
    f.write("新内容\n" + content)

文件指针

python
with open("example.txt", "r+", encoding="utf-8") as f:
    # 获取当前位置
    print(f.tell())  # 0

    # 读取内容
    content = f.read(5)
    print(f.tell())  # 5

    # 移动指针
    f.seek(0)        # 移动到开头
    f.seek(0, 2)     # 移动到末尾
    f.seek(10)       # 移动到第10个字节
    f.seek(-5, 2)    # 从末尾向前移动5个字节

pathlib 模块

Path 对象

python
from pathlib import Path

# 创建 Path 对象
p = Path("example.txt")
p = Path("D:/projects/test.txt")
p = Path.cwd() / "data" / "test.txt"  # 拼接路径

# 路径信息
print(p.name)          # 文件名: test.txt
print(p.stem)          # 文件名(无扩展名): test
print(p.suffix)        # 扩展名: .txt
print(p.suffixes)      # 所有扩展名: ['.txt']
print(p.parent)        # 父目录
print(p.parents)       # 所有父目录
print(p.parts)         # 路径各部分
print(p.anchor)        # 根目录

# 绝对路径
print(p.absolute())
print(p.resolve())     # 解析符号链接

# 路径操作
new_path = p.with_name("new.txt")       # 更改文件名
new_path = p.with_suffix(".md")         # 更改扩展名
new_path = p.parent / "new" / "file.txt"  # 拼接

# 判断
print(p.exists())      # 是否存在
print(p.is_file())     # 是否为文件
print(p.is_dir())      # 是否为目录
print(p.is_absolute()) # 是否为绝对路径

# 创建
p.mkdir(parents=True, exist_ok=True)  # 创建目录
p.touch()             # 创建文件

# 删除
p.unlink()            # 删除文件
p.rmdir()             # 删除空目录

文件读写

python
from pathlib import Path

p = Path("example.txt")

# 写入
p.write_text("Hello, World!\n", encoding="utf-8")
p.write_bytes(b"Binary data")

# 读取
content = p.read_text(encoding="utf-8")
data = p.read_bytes()

# 追加
with p.open("a", encoding="utf-8") as f:
    f.write("追加内容\n")

目录遍历

python
from pathlib import Path

dir_path = Path(".")

# 列出直接子项
for item in dir_path.iterdir():
    print(item.name, "目录" if item.is_dir() else "文件")

# 递归遍历
for item in dir_path.rglob("*"):
    print(item)

# 过滤文件
for py_file in dir_path.rglob("*.py"):
    print(py_file)

# 使用 glob 模式
for item in dir_path.glob("**/*.txt"):
    print(item)

# 只获取文件或目录
files = [f for f in dir_path.iterdir() if f.is_file()]
dirs = [d for d in dir_path.iterdir() if d.is_dir()]

os 和 os.path 模块

文件操作

python
import os

# 当前目录
print(os.getcwd())

# 改变目录
os.chdir("/path/to/dir")

# 列出目录内容
files = os.listdir(".")
for f in files:
    print(f)

# 创建目录
os.mkdir("new_dir")           # 创建单级目录
os.makedirs("a/b/c")          # 创建多级目录

# 删除
os.remove("file.txt")         # 删除文件
os.rmdir("empty_dir")         # 删除空目录
os.removedirs("a/b/c")        # 递归删除空目录

# 重命名
os.rename("old.txt", "new.txt")

# 文件信息
info = os.stat("file.txt")
print(info.st_size)    # 文件大小
print(info.st_mtime)   # 修改时间

# 环境变量
print(os.environ.get("HOME"))
print(os.environ.get("PATH"))

路径操作

python
import os.path

path = "/home/user/documents/file.txt"

# 路径信息
print(os.path.basename(path))  # file.txt
print(os.path.dirname(path))   # /home/user/documents
print(os.path.split(path))     # ('/home/user/documents', 'file.txt')
print(os.path.splitext(path))  # ('/home/user/documents/file', '.txt')

# 拼接路径
new_path = os.path.join("home", "user", "documents")
print(new_path)  # home/user/documents

# 判断
print(os.path.exists(path))
print(os.path.isfile(path))
print(os.path.isdir(path))

# 绝对路径
print(os.path.abspath("file.txt"))

# 规范化路径
print(os.path.normpath("home/./user/../documents"))
# home/documents

# 相对路径
print(os.path.relpath("/home/user", "/home"))
# user

# 文件大小
print(os.path.getsize("file.txt"))

shutil 模块

文件操作

python
import shutil

# 复制文件
shutil.copy("source.txt", "dest.txt")      # 复制内容和权限
shutil.copy2("source.txt", "dest.txt")     # 复制内容和元数据
shutil.copyfile("source.txt", "dest.txt")  # 只复制内容

# 复制目录
shutil.copytree("source_dir", "dest_dir")

# 移动/重命名
shutil.move("source.txt", "dest.txt")
shutil.move("source_dir", "dest_dir")

# 删除目录
shutil.rmtree("dir_to_delete")

# 磁盘使用情况
usage = shutil.disk_usage("/")
print(f"总空间: {usage.total / (1024**3):.2f} GB")
print(f"已使用: {usage.used / (1024**3):.2f} GB")
print(f"可用: {usage.free / (1024**3):.2f} GB")

# 归档
shutil.make_archive("archive", "zip", "source_dir")
shutil.unpack_archive("archive.zip", "extract_dir")

json 模块

JSON 读写

python
import json

# Python 对象转 JSON 字符串
data = {
    "name": "张三",
    "age": 25,
    "skills": ["Python", "Java", "JavaScript"],
    "active": True
}

json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)

# JSON 字符串转 Python 对象
parsed = json.loads(json_str)
print(parsed["name"])

# 写入 JSON 文件
with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as f:
    loaded = json.load(f)
    print(loaded)

# 自定义编码
from datetime import datetime

class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()
        return super().default(obj)

data = {"time": datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)

csv 模块

CSV 读写

python
import csv

# 写入 CSV
data = [
    ["姓名", "年龄", "城市"],
    ["张三", 25, "北京"],
    ["李四", 30, "上海"],
    ["王五", 28, "广州"]
]

with open("data.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerows(data)

# 读取 CSV
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

# 使用 DictWriter 和 DictReader
data = [
    {"name": "张三", "age": 25, "city": "北京"},
    {"name": "李四", "age": 30, "city": "上海"}
]

with open("data.csv", "w", newline="", encoding="utf-8") as f:
    fieldnames = ["name", "age", "city"]
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(data)

with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["name"], row["age"])

pickle 模块

对象序列化

python
import pickle

# 序列化对象
data = {
    "name": "张三",
    "scores": [90, 85, 88],
    "metadata": {"class": "A", "year": 2024}
}

# 写入文件
with open("data.pkl", "wb") as f:
    pickle.dump(data, f)

# 从文件读取
with open("data.pkl", "rb") as f:
    loaded = pickle.load(f)
    print(loaded)

# 序列化为字节
bytes_data = pickle.dumps(data)
loaded = pickle.loads(bytes_data)

# 注意:pickle 不安全,不要加载不受信任的数据

configparser 模块

配置文件

ini
; config.ini
[database]
host = localhost
port = 3306
name = mydb

[app]
debug = true
log_level = INFO
python
import configparser

config = configparser.ConfigParser()
config.read("config.ini", encoding="utf-8")

# 读取配置
host = config.get("database", "host")
port = config.getint("database", "port")
debug = config.getboolean("app", "debug")

print(f"数据库: {host}:{port}")
print(f"调试模式: {debug}")

# 修改配置
config.set("database", "host", "127.0.0.1")

# 添加节和选项
config.add_section("cache")
config.set("cache", "enabled", "true")

# 写入文件
with open("config.ini", "w", encoding="utf-8") as f:
    config.write(f)

实践示例

日志文件分析

python
from pathlib import Path
from collections import Counter
from datetime import datetime

def analyze_logs(log_dir: str):
    """分析日志目录中的所有日志文件"""
    log_path = Path(log_dir)

    # 统计信息
    level_counts = Counter()
    hourly_counts = Counter()
    errors = []

    # 遍历所有日志文件
    for log_file in log_path.glob("**/*.log"):
        with log_file.open("r", encoding="utf-8") as f:
            for line in f:
                # 解析日志行(示例格式:[2024-01-15 10:30:45] [ERROR] message)
                parts = line.strip().split("] ")
                if len(parts) >= 3:
                    timestamp = parts[0][1:]
                    level = parts[1][1:]
                    message = parts[2]

                    level_counts[level] += 1

                    try:
                        dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
                        hourly_counts[dt.hour] += 1
                    except ValueError:
                        pass

                    if level == "ERROR":
                        errors.append(message)

    return {
        "level_counts": dict(level_counts),
        "hourly_counts": dict(sorted(hourly_counts.items())),
        "errors": errors[:10]  # 前10个错误
    }

# 使用
result = analyze_logs("logs")
print("日志级别统计:", result["level_counts"])
print("每小时日志量:", result["hourly_counts"])

文件搜索工具

python
from pathlib import Path
import fnmatch
from typing import List, Generator

def find_files(
    directory: str,
    pattern: str = "*",
    content_pattern: str = None
) -> Generator[Path, None, None]:
    """搜索文件"""
    dir_path = Path(directory)

    for file_path in dir_path.rglob(pattern):
        if file_path.is_file():
            if content_pattern:
                try:
                    content = file_path.read_text(encoding="utf-8")
                    if content_pattern in content:
                        yield file_path
                except (UnicodeDecodeError, PermissionError):
                    continue
            else:
                yield file_path

def find_by_size(directory: str, min_size: int = 0, max_size: int = None):
    """按文件大小搜索"""
    dir_path = Path(directory)

    for file_path in dir_path.rglob("*"):
        if file_path.is_file():
            size = file_path.stat().st_size
            if size >= min_size and (max_size is None or size <= max_size):
                yield file_path, size

def find_duplicates(directory: str):
    """查找重复文件(基于文件大小和内容)"""
    from hashlib import md5
    from collections import defaultdict

    dir_path = Path(directory)
    size_map = defaultdict(list)

    # 按大小分组
    for file_path in dir_path.rglob("*"):
        if file_path.is_file():
            size = file_path.stat().st_size
            size_map[size].append(file_path)

    # 检查相同大小文件的哈希
    duplicates = []
    for size, files in size_map.items():
        if len(files) > 1:
            hash_map = defaultdict(list)
            for f in files:
                try:
                    content = f.read_bytes()
                    file_hash = md5(content).hexdigest()
                    hash_map[file_hash].append(f)
                except PermissionError:
                    continue

            for hash_val, dup_files in hash_map.items():
                if len(dup_files) > 1:
                    duplicates.append(dup_files)

    return duplicates

# 使用
for f in find_files(".", "*.py"):
    print(f)

for f, size in find_by_size(".", min_size=1024):
    print(f"{f}: {size} bytes")

dups = find_duplicates(".")
for dup_list in dups:
    print("重复文件:")
    for f in dup_list:
        print(f"  {f}")

文件备份工具

python
from pathlib import Path
from datetime import datetime
import shutil
import zipfile

def backup_directory(
    source_dir: str,
    backup_dir: str,
    backup_type: str = "copy"
):
    """备份目录"""
    source = Path(source_dir)
    backup = Path(backup_dir)

    # 创建备份目录
    backup.mkdir(parents=True, exist_ok=True)

    # 生成备份名称
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_name = f"{source.name}_{timestamp}"

    if backup_type == "copy":
        # 复制备份
        dest = backup / backup_name
        shutil.copytree(source, dest)
        print(f"复制备份完成: {dest}")
        return dest

    elif backup_type == "zip":
        # 压缩备份
        zip_path = backup / f"{backup_name}.zip"
        with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
            for file_path in source.rglob("*"):
                if file_path.is_file():
                    arcname = file_path.relative_to(source)
                    zf.write(file_path, arcname)
        print(f"压缩备份完成: {zip_path}")
        return zip_path

def incremental_backup(source_dir: str, backup_dir: str):
    """增量备份(只备份修改过的文件)"""
    source = Path(source_dir)
    backup = Path(backup_dir)
    backup.mkdir(parents=True, exist_ok=True)

    # 记录文件信息
    manifest_file = backup / "manifest.txt"
    manifest = {}

    if manifest_file.exists():
        with open(manifest_file, "r") as f:
            for line in f:
                path, mtime = line.strip().split("|")
                manifest[path] = float(mtime)

    # 检查并备份修改过的文件
    new_manifest = {}
    backed_up = 0

    for file_path in source.rglob("*"):
        if file_path.is_file():
            rel_path = str(file_path.relative_to(source))
            mtime = file_path.stat().st_mtime
            new_manifest[rel_path] = mtime

            if rel_path not in manifest or manifest[rel_path] < mtime:
                # 文件是新的或已修改
                dest = backup / "files" / rel_path
                dest.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy2(file_path, dest)
                backed_up += 1

    # 更新清单
    with open(manifest_file, "w") as f:
        for path, mtime in new_manifest.items():
            f.write(f"{path}|{mtime}\n")

    print(f"增量备份完成: {backed_up} 个文件")