Appearance
文件操作
Python 提供了丰富的文件操作功能,包括读写文件、目录操作等。
文件读写
基本读写
python
# 写入文件
with open("example.txt", "w", encoding="utf-8") as f:
f.write("Hello, World!\n")
f.write("Python 文件操作\n")
# 读取整个文件
with open("example.txt", "r", encoding="utf-8") as f:
content = f.read()
print(content)
# 按行读取
with open("example.txt", "r", encoding="utf-8") as f:
for line in f:
print(line.rstrip()) # 去除末尾换行符
# 读取所有行为列表
with open("example.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
print(lines)
# 读取指定字节数
with open("example.txt", "r", encoding="utf-8") as f:
chunk = f.read(10) # 读取10个字符
print(chunk)
# 追加写入
with open("example.txt", "a", encoding="utf-8") as f:
f.write("追加的内容\n")文件模式
| 模式 | 说明 |
|---|---|
r | 只读(默认) |
w | 只写(覆盖) |
a | 追加 |
x | 创建(文件存在则报错) |
b | 二进制模式 |
t | 文本模式(默认) |
+ | 读写模式 |
python
# 二进制读写
with open("image.png", "rb") as f:
data = f.read()
with open("copy.png", "wb") as f:
f.write(data)
# 读写模式
with open("example.txt", "r+", encoding="utf-8") as f:
content = f.read()
f.seek(0) # 移动到文件开头
f.write("新内容\n" + content)文件指针
python
with open("example.txt", "r+", encoding="utf-8") as f:
# 获取当前位置
print(f.tell()) # 0
# 读取内容
content = f.read(5)
print(f.tell()) # 5
# 移动指针
f.seek(0) # 移动到开头
f.seek(0, 2) # 移动到末尾
f.seek(10) # 移动到第10个字节
f.seek(-5, 2) # 从末尾向前移动5个字节pathlib 模块
Path 对象
python
from pathlib import Path
# 创建 Path 对象
p = Path("example.txt")
p = Path("D:/projects/test.txt")
p = Path.cwd() / "data" / "test.txt" # 拼接路径
# 路径信息
print(p.name) # 文件名: test.txt
print(p.stem) # 文件名(无扩展名): test
print(p.suffix) # 扩展名: .txt
print(p.suffixes) # 所有扩展名: ['.txt']
print(p.parent) # 父目录
print(p.parents) # 所有父目录
print(p.parts) # 路径各部分
print(p.anchor) # 根目录
# 绝对路径
print(p.absolute())
print(p.resolve()) # 解析符号链接
# 路径操作
new_path = p.with_name("new.txt") # 更改文件名
new_path = p.with_suffix(".md") # 更改扩展名
new_path = p.parent / "new" / "file.txt" # 拼接
# 判断
print(p.exists()) # 是否存在
print(p.is_file()) # 是否为文件
print(p.is_dir()) # 是否为目录
print(p.is_absolute()) # 是否为绝对路径
# 创建
p.mkdir(parents=True, exist_ok=True) # 创建目录
p.touch() # 创建文件
# 删除
p.unlink() # 删除文件
p.rmdir() # 删除空目录文件读写
python
from pathlib import Path
p = Path("example.txt")
# 写入
p.write_text("Hello, World!\n", encoding="utf-8")
p.write_bytes(b"Binary data")
# 读取
content = p.read_text(encoding="utf-8")
data = p.read_bytes()
# 追加
with p.open("a", encoding="utf-8") as f:
f.write("追加内容\n")目录遍历
python
from pathlib import Path
dir_path = Path(".")
# 列出直接子项
for item in dir_path.iterdir():
print(item.name, "目录" if item.is_dir() else "文件")
# 递归遍历
for item in dir_path.rglob("*"):
print(item)
# 过滤文件
for py_file in dir_path.rglob("*.py"):
print(py_file)
# 使用 glob 模式
for item in dir_path.glob("**/*.txt"):
print(item)
# 只获取文件或目录
files = [f for f in dir_path.iterdir() if f.is_file()]
dirs = [d for d in dir_path.iterdir() if d.is_dir()]os 和 os.path 模块
文件操作
python
import os
# 当前目录
print(os.getcwd())
# 改变目录
os.chdir("/path/to/dir")
# 列出目录内容
files = os.listdir(".")
for f in files:
print(f)
# 创建目录
os.mkdir("new_dir") # 创建单级目录
os.makedirs("a/b/c") # 创建多级目录
# 删除
os.remove("file.txt") # 删除文件
os.rmdir("empty_dir") # 删除空目录
os.removedirs("a/b/c") # 递归删除空目录
# 重命名
os.rename("old.txt", "new.txt")
# 文件信息
info = os.stat("file.txt")
print(info.st_size) # 文件大小
print(info.st_mtime) # 修改时间
# 环境变量
print(os.environ.get("HOME"))
print(os.environ.get("PATH"))路径操作
python
import os.path
path = "/home/user/documents/file.txt"
# 路径信息
print(os.path.basename(path)) # file.txt
print(os.path.dirname(path)) # /home/user/documents
print(os.path.split(path)) # ('/home/user/documents', 'file.txt')
print(os.path.splitext(path)) # ('/home/user/documents/file', '.txt')
# 拼接路径
new_path = os.path.join("home", "user", "documents")
print(new_path) # home/user/documents
# 判断
print(os.path.exists(path))
print(os.path.isfile(path))
print(os.path.isdir(path))
# 绝对路径
print(os.path.abspath("file.txt"))
# 规范化路径
print(os.path.normpath("home/./user/../documents"))
# home/documents
# 相对路径
print(os.path.relpath("/home/user", "/home"))
# user
# 文件大小
print(os.path.getsize("file.txt"))shutil 模块
文件操作
python
import shutil
# 复制文件
shutil.copy("source.txt", "dest.txt") # 复制内容和权限
shutil.copy2("source.txt", "dest.txt") # 复制内容和元数据
shutil.copyfile("source.txt", "dest.txt") # 只复制内容
# 复制目录
shutil.copytree("source_dir", "dest_dir")
# 移动/重命名
shutil.move("source.txt", "dest.txt")
shutil.move("source_dir", "dest_dir")
# 删除目录
shutil.rmtree("dir_to_delete")
# 磁盘使用情况
usage = shutil.disk_usage("/")
print(f"总空间: {usage.total / (1024**3):.2f} GB")
print(f"已使用: {usage.used / (1024**3):.2f} GB")
print(f"可用: {usage.free / (1024**3):.2f} GB")
# 归档
shutil.make_archive("archive", "zip", "source_dir")
shutil.unpack_archive("archive.zip", "extract_dir")json 模块
JSON 读写
python
import json
# Python 对象转 JSON 字符串
data = {
"name": "张三",
"age": 25,
"skills": ["Python", "Java", "JavaScript"],
"active": True
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
# JSON 字符串转 Python 对象
parsed = json.loads(json_str)
print(parsed["name"])
# 写入 JSON 文件
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as f:
loaded = json.load(f)
print(loaded)
# 自定义编码
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
data = {"time": datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)csv 模块
CSV 读写
python
import csv
# 写入 CSV
data = [
["姓名", "年龄", "城市"],
["张三", 25, "北京"],
["李四", 30, "上海"],
["王五", 28, "广州"]
]
with open("data.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(data)
# 读取 CSV
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
print(row)
# 使用 DictWriter 和 DictReader
data = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"}
]
with open("data.csv", "w", newline="", encoding="utf-8") as f:
fieldnames = ["name", "age", "city"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
with open("data.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
print(row["name"], row["age"])pickle 模块
对象序列化
python
import pickle
# 序列化对象
data = {
"name": "张三",
"scores": [90, 85, 88],
"metadata": {"class": "A", "year": 2024}
}
# 写入文件
with open("data.pkl", "wb") as f:
pickle.dump(data, f)
# 从文件读取
with open("data.pkl", "rb") as f:
loaded = pickle.load(f)
print(loaded)
# 序列化为字节
bytes_data = pickle.dumps(data)
loaded = pickle.loads(bytes_data)
# 注意:pickle 不安全,不要加载不受信任的数据configparser 模块
配置文件
ini
; config.ini
[database]
host = localhost
port = 3306
name = mydb
[app]
debug = true
log_level = INFOpython
import configparser
config = configparser.ConfigParser()
config.read("config.ini", encoding="utf-8")
# 读取配置
host = config.get("database", "host")
port = config.getint("database", "port")
debug = config.getboolean("app", "debug")
print(f"数据库: {host}:{port}")
print(f"调试模式: {debug}")
# 修改配置
config.set("database", "host", "127.0.0.1")
# 添加节和选项
config.add_section("cache")
config.set("cache", "enabled", "true")
# 写入文件
with open("config.ini", "w", encoding="utf-8") as f:
config.write(f)实践示例
日志文件分析
python
from pathlib import Path
from collections import Counter
from datetime import datetime
def analyze_logs(log_dir: str):
"""分析日志目录中的所有日志文件"""
log_path = Path(log_dir)
# 统计信息
level_counts = Counter()
hourly_counts = Counter()
errors = []
# 遍历所有日志文件
for log_file in log_path.glob("**/*.log"):
with log_file.open("r", encoding="utf-8") as f:
for line in f:
# 解析日志行(示例格式:[2024-01-15 10:30:45] [ERROR] message)
parts = line.strip().split("] ")
if len(parts) >= 3:
timestamp = parts[0][1:]
level = parts[1][1:]
message = parts[2]
level_counts[level] += 1
try:
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
hourly_counts[dt.hour] += 1
except ValueError:
pass
if level == "ERROR":
errors.append(message)
return {
"level_counts": dict(level_counts),
"hourly_counts": dict(sorted(hourly_counts.items())),
"errors": errors[:10] # 前10个错误
}
# 使用
result = analyze_logs("logs")
print("日志级别统计:", result["level_counts"])
print("每小时日志量:", result["hourly_counts"])文件搜索工具
python
from pathlib import Path
import fnmatch
from typing import List, Generator
def find_files(
directory: str,
pattern: str = "*",
content_pattern: str = None
) -> Generator[Path, None, None]:
"""搜索文件"""
dir_path = Path(directory)
for file_path in dir_path.rglob(pattern):
if file_path.is_file():
if content_pattern:
try:
content = file_path.read_text(encoding="utf-8")
if content_pattern in content:
yield file_path
except (UnicodeDecodeError, PermissionError):
continue
else:
yield file_path
def find_by_size(directory: str, min_size: int = 0, max_size: int = None):
"""按文件大小搜索"""
dir_path = Path(directory)
for file_path in dir_path.rglob("*"):
if file_path.is_file():
size = file_path.stat().st_size
if size >= min_size and (max_size is None or size <= max_size):
yield file_path, size
def find_duplicates(directory: str):
"""查找重复文件(基于文件大小和内容)"""
from hashlib import md5
from collections import defaultdict
dir_path = Path(directory)
size_map = defaultdict(list)
# 按大小分组
for file_path in dir_path.rglob("*"):
if file_path.is_file():
size = file_path.stat().st_size
size_map[size].append(file_path)
# 检查相同大小文件的哈希
duplicates = []
for size, files in size_map.items():
if len(files) > 1:
hash_map = defaultdict(list)
for f in files:
try:
content = f.read_bytes()
file_hash = md5(content).hexdigest()
hash_map[file_hash].append(f)
except PermissionError:
continue
for hash_val, dup_files in hash_map.items():
if len(dup_files) > 1:
duplicates.append(dup_files)
return duplicates
# 使用
for f in find_files(".", "*.py"):
print(f)
for f, size in find_by_size(".", min_size=1024):
print(f"{f}: {size} bytes")
dups = find_duplicates(".")
for dup_list in dups:
print("重复文件:")
for f in dup_list:
print(f" {f}")文件备份工具
python
from pathlib import Path
from datetime import datetime
import shutil
import zipfile
def backup_directory(
source_dir: str,
backup_dir: str,
backup_type: str = "copy"
):
"""备份目录"""
source = Path(source_dir)
backup = Path(backup_dir)
# 创建备份目录
backup.mkdir(parents=True, exist_ok=True)
# 生成备份名称
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{source.name}_{timestamp}"
if backup_type == "copy":
# 复制备份
dest = backup / backup_name
shutil.copytree(source, dest)
print(f"复制备份完成: {dest}")
return dest
elif backup_type == "zip":
# 压缩备份
zip_path = backup / f"{backup_name}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for file_path in source.rglob("*"):
if file_path.is_file():
arcname = file_path.relative_to(source)
zf.write(file_path, arcname)
print(f"压缩备份完成: {zip_path}")
return zip_path
def incremental_backup(source_dir: str, backup_dir: str):
"""增量备份(只备份修改过的文件)"""
source = Path(source_dir)
backup = Path(backup_dir)
backup.mkdir(parents=True, exist_ok=True)
# 记录文件信息
manifest_file = backup / "manifest.txt"
manifest = {}
if manifest_file.exists():
with open(manifest_file, "r") as f:
for line in f:
path, mtime = line.strip().split("|")
manifest[path] = float(mtime)
# 检查并备份修改过的文件
new_manifest = {}
backed_up = 0
for file_path in source.rglob("*"):
if file_path.is_file():
rel_path = str(file_path.relative_to(source))
mtime = file_path.stat().st_mtime
new_manifest[rel_path] = mtime
if rel_path not in manifest or manifest[rel_path] < mtime:
# 文件是新的或已修改
dest = backup / "files" / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(file_path, dest)
backed_up += 1
# 更新清单
with open(manifest_file, "w") as f:
for path, mtime in new_manifest.items():
f.write(f"{path}|{mtime}\n")
print(f"增量备份完成: {backed_up} 个文件")