Appearance
文件操作
文件操作是编程中常见的任务之一。Python提供了丰富的文件操作功能,包括文件的读写、目录管理、路径处理等。本章将详细介绍Python中的文件操作。
文件的打开与关闭
打开文件
python
# 文件的打开与关闭
# 1. 使用 open() 函数打开文件
# 基本语法: open(filename, mode, encoding)
# mode 参数说明:
# 'r' - 只读(默认)
# 'w' - 写入(会覆盖已有文件)
# 'x' - 独占创建(文件已存在则报错)
# 'a' - 追加写入
# 'b' - 二进制模式
# 't' - 文本模式(默认)
# '+' - 读写模式
# 2. 传统方式打开文件(需要手动关闭)
file = open("example.txt", "w", encoding="utf-8")
file.write("Hello, Python!")
file.close() # 必须手动关闭
# 3. 使用 with 语句(推荐,自动关闭)
with open("example.txt", "w", encoding="utf-8") as file:
file.write("Hello, Python!\n")
file.write("这是第二行。\n")
# 文件会在 with 块结束时自动关闭
# 4. 打开模式详解
# 只读模式
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()
print(f"读取内容:\n{content}")
# 写入模式(覆盖)
with open("example.txt", "w", encoding="utf-8") as file:
file.write("新内容覆盖了原文件\n")
# 追加模式
with open("example.txt", "a", encoding="utf-8") as file:
file.write("这是追加的内容\n")
# 读写模式
with open("example.txt", "r+", encoding="utf-8") as file:
content = file.read()
print(f"当前内容: {content}")
file.write("\n追加的新内容")
# 5. 二进制模式
# 写入二进制文件
binary_data = b'\x48\x65\x6c\x6c\x6f' # "Hello" 的字节表示
with open("binary_file.bin", "wb") as file:
file.write(binary_data)
# 读取二进制文件
with open("binary_file.bin", "rb") as file:
data = file.read()
print(f"二进制数据: {data}")
print(f"解码后: {data.decode('utf-8')}")文件编码
python
# 文件编码处理
# 1. 指定编码打开文件
with open("chinese.txt", "w", encoding="utf-8") as file:
file.write("你好,世界!\n")
file.write("Python文件操作\n")
# 2. 读取时指定编码
with open("chinese.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)
# 3. 处理编码错误
# errors 参数:
# 'strict' - 严格模式,编码错误时抛出异常(默认)
# 'ignore' - 忽略错误
# 'replace' - 用 ? 替换无法解码的字符
# 模拟编码错误处理
text = "Hello 你好"
# 写入时使用 utf-8
with open("encoded.txt", "w", encoding="utf-8") as file:
file.write(text)
# 读取时处理编码错误
try:
# 尝试用错误的编码读取
with open("encoded.txt", "r", encoding="ascii") as file:
content = file.read()
except UnicodeDecodeError as e:
print(f"编码错误: {e}")
# 使用 errors 参数处理
with open("encoded.txt", "r", encoding="ascii", errors="replace") as file:
content = file.read()
print(f"替换模式读取: {content}")
# 4. 检测文件编码
import chardet
def detect_encoding(filename):
"""检测文件编码"""
with open(filename, "rb") as file:
raw_data = file.read()
result = chardet.detect(raw_data)
return result
# 注意:chardet 需要安装: pip install chardet
# encoding_info = detect_encoding("chinese.txt")
# print(f"检测到的编码: {encoding_info}")文件的读取
读取方法
python
# 文件读取方法
# 创建测试文件
with open("test_read.txt", "w", encoding="utf-8") as file:
file.write("第一行:Hello Python\n")
file.write("第二行:文件读取示例\n")
file.write("第三行:学习Python很有趣\n")
file.write("第四行:继续加油\n")
file.write("第五行:最后一行\n")
# 1. read() - 读取全部内容
print("=== read() 方法 ===")
with open("test_read.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)
# 2. read(size) - 读取指定数量的字符
print("\n=== read(size) 方法 ===")
with open("test_read.txt", "r", encoding="utf-8") as file:
content = file.read(10) # 读取前10个字符
print(f"前10个字符: '{content}'")
# 3. readline() - 读取一行
print("\n=== readline() 方法 ===")
with open("test_read.txt", "r", encoding="utf-8") as file:
line1 = file.readline()
line2 = file.readline()
print(f"第一行: {line1.strip()}")
print(f"第二行: {line2.strip()}")
# 4. readlines() - 读取所有行,返回列表
print("\n=== readlines() 方法 ===")
with open("test_read.txt", "r", encoding="utf-8") as file:
lines = file.readlines()
print(f"共 {len(lines)} 行")
for i, line in enumerate(lines, 1):
print(f"第{i}行: {line.strip()}")
# 5. 遍历文件对象(推荐)
print("\n=== 遍历文件对象 ===")
with open("test_read.txt", "r", encoding="utf-8") as file:
for line_num, line in enumerate(file, 1):
print(f"第{line_num}行: {line.strip()}")
# 6. 大文件读取(分块读取)
print("\n=== 分块读取大文件 ===")
def read_in_chunks(filename, chunk_size=1024):
"""分块读取大文件"""
with open(filename, "r", encoding="utf-8") as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
for i, chunk in enumerate(read_in_chunks("test_read.txt", 20), 1):
print(f"块 {i}: '{chunk}'")文件指针
python
# 文件指针操作
with open("seek_test.txt", "w", encoding="utf-8") as file:
file.write("0123456789ABCDEFGHIJ")
# 1. tell() - 获取当前指针位置
with open("seek_test.txt", "r", encoding="utf-8") as file:
print(f"初始位置: {file.tell()}")
content = file.read(5)
print(f"读取5个字符: '{content}'")
print(f"当前位置: {file.tell()}")
# 2. seek() - 移动指针位置
# seek(offset, whence)
# whence: 0-文件开头(默认), 1-当前位置, 2-文件末尾
print("\n=== seek() 操作 ===")
with open("seek_test.txt", "r", encoding="utf-8") as file:
# 从开头移动
file.seek(5)
print(f"移动到位置5: '{file.read(3)}'")
# 回到开头
file.seek(0)
print(f"回到开头: '{file.read(5)}'")
# 从当前位置移动(注意:文本模式下只能从开头定位)
file.seek(10)
print(f"移动到位置10: '{file.read(5)}'")
# 3. 二进制模式下的 seek
print("\n=== 二进制模式 seek ===")
with open("seek_test.txt", "rb") as file:
file.seek(-5, 2) # 从末尾向前移动5个字节
print(f"末尾5个字节: {file.read()}")
# 4. 实际应用:修改文件特定位置
def modify_file_at_position(filename, position, new_content):
"""在指定位置修改文件内容"""
with open(filename, "r+", encoding="utf-8") as file:
file.seek(position)
file.write(new_content)
modify_file_at_position("seek_test.txt", 0, "XXXXX")
with open("seek_test.txt", "r", encoding="utf-8") as file:
print(f"修改后内容: {file.read()}")文件的写入
写入方法
python
# 文件写入方法
# 1. write() - 写入字符串
with open("write_test.txt", "w", encoding="utf-8") as file:
file.write("第一行内容\n")
file.write("第二行内容\n")
# write() 返回写入的字符数
chars_written = file.write("第三行内容\n")
print(f"写入了 {chars_written} 个字符")
# 2. writelines() - 写入字符串列表
lines = ["行1\n", "行2\n", "行3\n"]
with open("write_test.txt", "a", encoding="utf-8") as file:
file.writelines(lines)
# 注意:writelines() 不会自动添加换行符
lines = ["行4", "行5", "行6"]
with open("write_test.txt", "a", encoding="utf-8") as file:
file.write("\n") # 手动添加换行
file.writelines(lines) # 会写成 "行4行5行6"
# 3. 写入不同类型的数据
import json
data = {
"name": "张三",
"age": 20,
"scores": [85, 90, 78]
}
# 写入 JSON
with open("data.json", "w", encoding="utf-8") as file:
json.dump(data, file, ensure_ascii=False, indent=2)
# 读取 JSON
with open("data.json", "r", encoding="utf-8") as file:
loaded_data = json.load(file)
print(f"读取的数据: {loaded_data}")
# 4. 格式化写入
students = [
("张三", 20, 85),
("李四", 21, 90),
("王五", 22, 78)
]
with open("students.txt", "w", encoding="utf-8") as file:
file.write("姓名\t年龄\t成绩\n")
file.write("-" * 30 + "\n")
for name, age, score in students:
file.write(f"{name}\t{age}\t{score}\n")
# 5. 追加写入日志
import datetime
def log_message(filename, message):
"""写入日志"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(filename, "a", encoding="utf-8") as file:
file.write(f"[{timestamp}] {message}\n")
log_message("app.log", "程序启动")
log_message("app.log", "执行任务")
log_message("app.log", "程序结束")
# 读取日志
with open("app.log", "r", encoding="utf-8") as file:
print(file.read())文件缓冲
python
# 文件缓冲
# 1. 缓冲模式
# buffering 参数:
# -1: 默认缓冲(行缓冲用于文本,块缓冲用于二进制)
# 0: 无缓冲(仅二进制模式)
# 1: 行缓冲(仅文本模式)
# >1: 指定缓冲区大小
# 无缓冲写入(每次写入直接写入磁盘)
with open("unbuffered.txt", "w", encoding="utf-8", buffering=1) as file:
file.write("这行会立即写入磁盘\n")
# 指定缓冲区大小
with open("buffered.txt", "w", encoding="utf-8", buffering=4096) as file:
file.write("内容会先存入缓冲区")
# 缓冲区满或文件关闭时才写入磁盘
# 2. flush() - 手动刷新缓冲区
with open("flush_test.txt", "w", encoding="utf-8") as file:
file.write("写入缓冲区")
file.flush() # 立即将缓冲区内容写入磁盘
file.write("继续写入")
# 文件关闭时自动刷新
# 3. 实时日志写入
class RealTimeLogger:
"""实时日志记录器"""
def __init__(self, filename):
self.filename = filename
self.file = open(filename, "a", encoding="utf-8", buffering=1)
def log(self, message):
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.file.write(f"[{timestamp}] {message}\n")
self.file.flush() # 确保立即写入
def close(self):
self.file.close()
# 使用
logger = RealTimeLogger("realtime.log")
logger.log("实时日志测试")
logger.close()目录操作
os 模块
python
# 目录操作 - os 模块
import os
# 1. 获取当前工作目录
current_dir = os.getcwd()
print(f"当前目录: {current_dir}")
# 2. 改变工作目录
original_dir = os.getcwd()
os.chdir("..") # 切换到上级目录
print(f"上级目录: {os.getcwd()}")
os.chdir(original_dir) # 切换回原目录
# 3. 创建目录
os.makedirs("test_dir/sub_dir", exist_ok=True) # 递归创建目录
print(f"创建目录: test_dir/sub_dir")
# 4. 删除目录
os.rmdir("test_dir/sub_dir") # 删除空目录
os.rmdir("test_dir")
print("删除目录成功")
# 5. 列出目录内容
print(f"\n当前目录内容:")
for item in os.listdir("."):
print(f" {item}")
# 6. 判断路径类型
print(f"\n路径类型判断:")
print(f"'.' 是目录: {os.path.isdir('.')}")
print(f"'example.txt' 是文件: {os.path.isfile('example.txt')}")
print(f"'不存在的文件' 存在: {os.path.exists('不存在的文件')}")
# 7. 重命名和删除文件
with open("old_name.txt", "w", encoding="utf-8") as file:
file.write("测试文件")
os.rename("old_name.txt", "new_name.txt") # 重命名
print(f"重命名成功")
os.remove("new_name.txt") # 删除文件
print(f"删除成功")
# 8. 获取文件信息
with open("info_test.txt", "w", encoding="utf-8") as file:
file.write("测试文件信息")
file_stat = os.stat("info_test.txt")
print(f"\n文件信息:")
print(f" 文件大小: {file_stat.st_size} 字节")
print(f" 创建时间: {file_stat.st_ctime}")
print(f" 修改时间: {file_stat.st_mtime}")
os.remove("info_test.txt")os.path 模块
python
# 路径操作 - os.path 模块
import os
from pathlib import Path
# 1. 路径拼接
path1 = os.path.join("folder", "subfolder", "file.txt")
print(f"拼接路径: {path1}")
# 2. 获取路径的各个部分
filepath = "/home/user/documents/file.txt"
print(f"\n路径: {filepath}")
print(f" 目录名: {os.path.dirname(filepath)}")
print(f" 文件名: {os.path.basename(filepath)}")
print(f" 分割: {os.path.split(filepath)}")
print(f" 扩展名: {os.path.splitext(filepath)}")
# 3. 获取绝对路径
relative_path = "example.txt"
absolute_path = os.path.abspath(relative_path)
print(f"\n相对路径: {relative_path}")
print(f"绝对路径: {absolute_path}")
# 4. 规范化路径
messy_path = "folder/../folder/./file.txt"
clean_path = os.path.normpath(messy_path)
print(f"\n规范化路径: {messy_path} -> {clean_path}")
# 5. 获取文件大小
with open("size_test.txt", "w", encoding="utf-8") as file:
file.write("测试文件大小")
size = os.path.getsize("size_test.txt")
print(f"\n文件大小: {size} 字节")
# 6. 获取文件时间
import time
mtime = os.path.getmtime("size_test.txt")
print(f"修改时间: {time.ctime(mtime)}")
# 7. 检查路径
print(f"\n路径检查:")
print(f" 是否绝对路径: {os.path.isabs('/home/user')}")
print(f" 是否相同路径: {os.path.samefile('.', os.getcwd())}")
os.remove("size_test.txt")pathlib 模块(推荐)
python
# pathlib 模块 - 面向对象的路径操作(Python 3.4+,推荐)
from pathlib import Path
# 1. 创建 Path 对象
p1 = Path("example.txt")
p2 = Path("/home/user/documents")
p3 = Path.home() # 用户主目录
p4 = Path.cwd() # 当前工作目录
print(f"Path对象: {p1}")
print(f"用户主目录: {p3}")
print(f"当前目录: {p4}")
# 2. 路径拼接(使用 / 运算符)
data_dir = Path("data")
config_file = data_dir / "config" / "settings.json"
print(f"\n拼接路径: {config_file}")
# 3. 获取路径属性
filepath = Path("/home/user/documents/file.txt")
print(f"\n路径属性:")
print(f" 父目录: {filepath.parent}")
print(f" 文件名: {filepath.name}")
print(f" 文件名(无扩展名): {filepath.stem}")
print(f" 扩展名: {filepath.suffix}")
print(f" 所有后缀: {filepath.suffixes}")
# 4. 路径检查
test_path = Path("test_path_demo.txt")
test_path.write_text("测试内容", encoding="utf-8")
print(f"\n路径检查:")
print(f" 存在: {test_path.exists()}")
print(f" 是文件: {test_path.is_file()}")
print(f" 是目录: {test_path.is_dir()}")
# 5. 创建和删除
new_dir = Path("test_directory")
new_dir.mkdir(exist_ok=True) # 创建目录
print(f"\n创建目录: {new_dir}")
new_file = new_dir / "new_file.txt"
new_file.write_text("新文件内容", encoding="utf-8")
print(f"创建文件: {new_file}")
# 删除
new_file.unlink() # 删除文件
new_dir.rmdir() # 删除空目录
print("删除成功")
# 6. 遍历目录
test_dir = Path("traverse_test")
test_dir.mkdir(exist_ok=True)
(test_dir / "file1.txt").write_text("内容1")
(test_dir / "file2.txt").write_text("内容2")
(test_dir / "subdir").mkdir(exist_ok=True)
print(f"\n遍历目录 {test_dir}:")
for item in test_dir.iterdir():
print(f" {item.name} {'[目录]' if item.is_dir() else '[文件]'}")
# 递归遍历
print(f"\n递归遍历:")
for item in test_dir.rglob("*"):
print(f" {item}")
# 7. 文件读写
file_path = Path("pathlib_test.txt")
file_path.write_text("Hello, Path!", encoding="utf-8")
content = file_path.read_text(encoding="utf-8")
print(f"\n文件内容: {content}")
# 8. 文件操作
src = Path("source.txt")
src.write_text("源文件", encoding="utf-8")
dst = Path("destination.txt")
src.replace(dst) # 移动/重命名
print(f"\n移动文件: {src} -> {dst}")
# 清理
import shutil
shutil.rmtree("traverse_test")
dst.unlink()
test_path.unlink()
os.remove("write_test.txt")
os.remove("students.txt")
os.remove("data.json")
os.remove("app.log")
os.remove("realtime.log")
os.remove("pathlib_test.txt")
os.remove("flush_test.txt")
os.remove("buffered.txt")
os.remove("unbuffered.txt")
os.remove("test_read.txt")
os.remove("seek_test.txt")
os.remove("encoded.txt")
os.remove("chinese.txt")
os.remove("example.txt")
os.remove("binary_file.bin")文件操作实战
CSV 文件处理
python
# CSV 文件处理
import csv
# 1. 写入 CSV 文件
students = [
{"name": "张三", "age": 20, "score": 85},
{"name": "李四", "age": 21, "score": 90},
{"name": "王五", "age": 22, "score": 78}
]
# 使用 DictWriter
with open("students.csv", "w", encoding="utf-8", newline="") as file:
fieldnames = ["name", "age", "score"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(students) # 写入多行
print("CSV 文件写入成功")
# 2. 读取 CSV 文件
with open("students.csv", "r", encoding="utf-8") as file:
reader = csv.DictReader(file)
print("\nCSV 文件内容:")
for row in reader:
print(f" {row}")
# 3. 使用列表方式读写
data = [
["姓名", "年龄", "成绩"],
["张三", 20, 85],
["李四", 21, 90]
]
with open("data.csv", "w", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
writer.writerows(data)
with open("data.csv", "r", encoding="utf-8") as file:
reader = csv.reader(file)
for row in reader:
print(row)
# 清理
import os
os.remove("students.csv")
os.remove("data.csv")文件搜索
python
# 文件搜索
import os
from pathlib import Path
import fnmatch
# 创建测试目录结构
test_root = Path("search_test")
test_root.mkdir(exist_ok=True)
(test_root / "file1.txt").write_text("内容1")
(test_root / "file2.py").write_text("# Python文件")
(test_root / "data.json").write_text("{}")
(test_root / "subdir").mkdir(exist_ok=True)
(test_root / "subdir" / "file3.txt").write_text("内容3")
(test_root / "subdir" / "script.py").write_text("# 脚本")
# 1. 使用 os.walk 遍历
print("=== os.walk 遍历 ===")
for root, dirs, files in os.walk("search_test"):
print(f"目录: {root}")
for file in files:
print(f" 文件: {file}")
# 2. 使用 pathlib 递归搜索
print("\n=== pathlib 递归搜索 ===")
for py_file in Path("search_test").rglob("*.py"):
print(f"Python文件: {py_file}")
# 3. 搜索特定模式的文件
print("\n=== 模式匹配搜索 ===")
for file in Path("search_test").rglob("*"):
if fnmatch.fnmatch(file.name, "*.txt"):
print(f"TXT文件: {file}")
# 4. 自定义搜索函数
def find_files(directory, pattern, recursive=True):
"""搜索匹配模式的文件"""
directory = Path(directory)
if recursive:
files = directory.rglob(pattern)
else:
files = directory.glob(pattern)
return list(files)
print("\n=== 自定义搜索 ===")
txt_files = find_files("search_test", "*.txt")
print(f"所有TXT文件: {txt_files}")
# 5. 搜索文件内容
def search_in_files(directory, keyword, file_pattern="*"):
"""在文件中搜索关键词"""
results = []
for file_path in Path(directory).rglob(file_pattern):
if file_path.is_file():
try:
content = file_path.read_text(encoding="utf-8")
if keyword in content:
results.append(file_path)
except:
pass
return results
# 创建包含关键词的文件
(test_root / "keyword_test.txt").write_text("这是一个包含关键词的文件")
print(f"\n包含'关键词'的文件: {search_in_files('search_test', '关键词')}")
# 清理
import shutil
shutil.rmtree("search_test")文件监控
python
# 文件监控(简单实现)
import os
import time
from pathlib import Path
class FileWatcher:
"""简单的文件监控器"""
def __init__(self, path):
self.path = Path(path)
self.last_modified = {}
self.last_files = set()
def scan(self):
"""扫描目录"""
current_files = set()
changes = {
"created": [],
"deleted": [],
"modified": []
}
if not self.path.exists():
return changes
# 检查现有文件
for file_path in self.path.rglob("*"):
if file_path.is_file():
current_files.add(str(file_path))
mtime = file_path.stat().st_mtime
if str(file_path) not in self.last_files:
changes["created"].append(file_path)
elif self.last_modified.get(str(file_path)) != mtime:
changes["modified"].append(file_path)
self.last_modified[str(file_path)] = mtime
# 检查删除的文件
deleted = self.last_files - current_files
for file_path in deleted:
changes["deleted"].append(Path(file_path))
del self.last_modified[file_path]
self.last_files = current_files
return changes
# 演示
watch_dir = Path("watch_test")
watch_dir.mkdir(exist_ok=True)
watcher = FileWatcher(watch_dir)
# 初始扫描
print("初始扫描:")
print(watcher.scan())
# 创建文件
(watch_dir / "new_file.txt").write_text("新文件")
time.sleep(0.1)
print("\n创建文件后:")
changes = watcher.scan()
print(f" 新建: {changes['created']}")
# 修改文件
(watch_dir / "new_file.txt").write_text("修改后的内容")
time.sleep(0.1)
print("\n修改文件后:")
changes = watcher.scan()
print(f" 修改: {changes['modified']}")
# 删除文件
(watch_dir / "new_file.txt").unlink()
print("\n删除文件后:")
changes = watcher.scan()
print(f" 删除: {changes['deleted']}")
# 清理
watch_dir.rmdir()临时文件
python
# 临时文件处理
import tempfile
import os
# 1. 创建临时文件
print("=== 临时文件 ===")
# 创建临时文件(自动删除)
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8", delete=False) as temp:
temp.write("临时文件内容")
temp_name = temp.name
print(f"临时文件路径: {temp_name}")
# 读取临时文件
with open(temp_name, "r", encoding="utf-8") as f:
print(f"内容: {f.read()}")
# 手动删除
os.unlink(temp_name)
# 2. 创建临时目录
print("\n=== 临时目录 ===")
with tempfile.TemporaryDirectory() as temp_dir:
print(f"临时目录: {temp_dir}")
# 在临时目录中创建文件
temp_file = os.path.join(temp_dir, "temp.txt")
with open(temp_file, "w", encoding="utf-8") as f:
f.write("临时内容")
print(f"临时文件存在: {os.path.exists(temp_file)}")
# 临时目录自动删除
print(f"临时目录已删除: {not os.path.exists(temp_dir)}")
# 3. 获取临时目录路径
print(f"\n系统临时目录: {tempfile.gettempdir()}")
print(f"临时文件前缀: {tempfile.gettempprefix()}")小结
本章学习了Python文件操作的完整知识:
文件读写:
open()函数和文件模式read(),readline(),readlines()读取方法write(),writelines()写入方法- 文件指针操作
目录操作:
os模块的目录操作os.path模块的路径处理pathlib模块(推荐)
文件编码:
- 指定编码打开文件
- 处理编码错误
实用技巧:
- CSV 文件处理
- 文件搜索
- 文件监控
- 临时文件
掌握文件操作是Python编程的重要技能,在实际开发中会频繁使用这些知识来处理各种文件和数据。
