Appearance
异常处理
在程序运行过程中,难免会遇到各种错误和异常情况。Python提供了完善的异常处理机制,可以帮助我们优雅地处理这些错误,使程序更加健壮和可靠。本章将详细介绍Python中的异常处理机制。
什么是异常
异常的概念
异常是程序运行时发生的错误,它会中断程序的正常执行流程。Python使用异常对象来表示异常情况。
python
# 常见的异常示例
# 1. ZeroDivisionError - 除零错误
try:
result = 10 / 0
except ZeroDivisionError as e:
print(f"除零错误: {e}")
# 2. TypeError - 类型错误
try:
result = "hello" + 123
except TypeError as e:
print(f"类型错误: {e}")
# 3. IndexError - 索引越界
try:
my_list = [1, 2, 3]
print(my_list[10])
except IndexError as e:
print(f"索引越界: {e}")
# 4. KeyError - 键错误
try:
my_dict = {"name": "张三"}
print(my_dict["age"])
except KeyError as e:
print(f"键错误: {e}")
# 5. ValueError - 值错误
try:
number = int("abc")
except ValueError as e:
print(f"值错误: {e}")
# 6. FileNotFoundError - 文件未找到
try:
with open("不存在的文件.txt", "r") as f:
content = f.read()
except FileNotFoundError as e:
print(f"文件未找到: {e}")
# 7. AttributeError - 属性错误
try:
text = "hello"
text.nonexistent_method()
except AttributeError as e:
print(f"属性错误: {e}")
# 8. NameError - 名称错误
try:
print(undefined_variable)
except NameError as e:
print(f"名称错误: {e}")异常的层次结构
python
# Python异常的层次结构
"""
BaseException (所有异常的基类)
├── SystemExit (sys.exit() 引发)
├── KeyboardInterrupt (Ctrl+C 中断)
├── GeneratorExit (生成器关闭)
└── Exception (常规异常的基类)
├── StopIteration (迭代结束)
├── ArithmeticError (算术错误)
│ ├── ZeroDivisionError (除零)
│ ├── OverflowError (溢出)
│ └── FloatingPointError (浮点错误)
├── LookupError (查找错误)
│ ├── IndexError (索引越界)
│ └── KeyError (键错误)
├── OSError (操作系统错误)
│ ├── FileNotFoundError (文件未找到)
│ ├── PermissionError (权限错误)
│ └── ...
├── TypeError (类型错误)
├── ValueError (值错误)
├── AttributeError (属性错误)
├── NameError (名称错误)
└── ...
"""
# 检查异常的继承关系
print(f"ZeroDivisionError 是 ArithmeticError 的子类: {issubclass(ZeroDivisionError, ArithmeticError)}")
print(f"IndexError 是 LookupError 的子类: {issubclass(IndexError, LookupError)}")
print(f"FileNotFoundError 是 OSError 的子类: {issubclass(FileNotFoundError, OSError)}")
print(f"Exception 是 BaseException 的子类: {issubclass(Exception, BaseException)}")try-except 语句
基本语法
python
# try-except 基本语法
# 基本结构
try:
# 可能引发异常的代码
number = int(input("请输入一个数字: "))
result = 100 / number
print(f"100除以{number}等于{result}")
except ValueError:
# 处理 ValueError 异常
print("输入的不是有效数字!")
except ZeroDivisionError:
# 处理 ZeroDivisionError 异常
print("不能除以零!")
# 捕获多个异常
try:
my_list = [1, 2, 3]
index = int(input("请输入索引(0-2): "))
print(f"列表中的值: {my_list[index]}")
except (ValueError, IndexError) as e:
# 同时捕获多种异常
print(f"发生错误: {e}")
# 捕获所有异常
try:
# 一些可能出错的代码
result = 10 / 0
except Exception as e:
# 捕获所有常规异常
print(f"发生异常: {type(e).__name__}: {e}")
# 注意:不建议直接使用空的 except,除非你知道自己在做什么
# 因为它会捕获所有异常,包括系统退出异常
try:
result = 10 / 0
except:
print("发生了某种错误") # 不推荐,无法知道具体错误类型获取异常信息
python
# 获取异常的详细信息
try:
result = 10 / 0
except ZeroDivisionError as e:
# e 是异常对象
print(f"异常对象: {e}")
print(f"异常类型: {type(e).__name__}")
print(f"异常信息: {str(e)}")
print(f"异常参数: {e.args}")
# 使用 sys.exc_info() 获取异常信息
import sys
try:
result = 10 / 0
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print(f"异常类型: {exc_type}")
print(f"异常值: {exc_value}")
print(f"异常追踪: {exc_traceback}")
# 使用 traceback 模块获取完整的堆栈追踪
import traceback
try:
def func_a():
func_b()
def func_b():
result = 10 / 0
func_a()
except Exception as e:
print("完整的错误追踪:")
traceback.print_exc()try-except-else-finally
python
# 完整的异常处理结构
def divide_numbers(a, b):
"""演示完整的异常处理结构"""
try:
# try 块:可能引发异常的代码
result = a / b
except ZeroDivisionError:
# except 块:处理异常
print("错误:不能除以零!")
return None
except TypeError:
# 可以有多个 except 块
print("错误:参数类型不正确!")
return None
else:
# else 块:没有异常发生时执行
print("计算成功!")
return result
finally:
# finally 块:无论是否发生异常都会执行
print("计算结束。")
# 测试各种情况
print("=== 测试1: 正常情况 ===")
print(f"结果: {divide_numbers(10, 2)}")
print("\n=== 测试2: 除零错误 ===")
print(f"结果: {divide_numbers(10, 0)}")
print("\n=== 测试3: 类型错误 ===")
print(f"结果: {divide_numbers(10, '2')}")
# finally 的典型应用:资源清理
def read_file(filename):
"""读取文件内容,确保文件被正确关闭"""
f = None
try:
f = open(filename, 'r', encoding='utf-8')
content = f.read()
return content
except FileNotFoundError:
print(f"文件 {filename} 不存在")
return None
finally:
# 确保文件被关闭
if f:
f.close()
print("文件已关闭")
# 测试
print("\n=== 文件读取测试 ===")
content = read_file("不存在的文件.txt")异常的抛出
raise 语句
python
# 使用 raise 抛出异常
# 1. 抛出内置异常
def check_age(age):
"""检查年龄是否合法"""
if age < 0:
raise ValueError("年龄不能为负数")
if age > 150:
raise ValueError("年龄不能超过150")
return age
try:
check_age(-5)
except ValueError as e:
print(f"错误: {e}")
# 2. 重新抛出异常
def process_data(data):
"""处理数据"""
try:
number = int(data)
result = 100 / number
return result
except ValueError:
print("数据转换失败")
raise # 重新抛出当前异常
except ZeroDivisionError:
print("除零错误")
raise ValueError("数据不能为零") from None # 抛出新异常,隐藏原始异常
try:
process_data("abc")
except ValueError as e:
print(f"捕获到异常: {e}")
# 3. 异常链
def func_a():
raise ValueError("原始错误")
def func_b():
try:
func_a()
except ValueError as e:
# 使用 from 关键字创建异常链
raise RuntimeError("处理过程中出错") from e
try:
func_b()
except RuntimeError as e:
print(f"异常: {e}")
print(f"原始异常: {e.__cause__}")
# 4. 条件抛出
def withdraw(balance, amount):
"""取款函数"""
if amount <= 0:
raise ValueError("取款金额必须大于零")
if amount > balance:
raise ValueError("余额不足")
return balance - amount
try:
new_balance = withdraw(100, 150)
except ValueError as e:
print(f"取款失败: {e}")assert 断言
python
# 使用 assert 进行断言
def calculate_average(numbers):
"""计算平均值"""
# 断言:确保列表不为空
assert len(numbers) > 0, "列表不能为空"
# 断言:确保所有元素都是数字
assert all(isinstance(n, (int, float)) for n in numbers), "所有元素必须是数字"
return sum(numbers) / len(numbers)
# 正常情况
print(f"平均值: {calculate_average([1, 2, 3, 4, 5])}")
# 断言失败
try:
calculate_average([])
except AssertionError as e:
print(f"断言错误: {e}")
try:
calculate_average([1, 2, "3", 4, 5])
except AssertionError as e:
print(f"断言错误: {e}")
# assert 的注意事项
# assert 语句在 Python 使用 -O 优化选项运行时会被禁用
# 因此,不要用 assert 来做数据验证或安全检查
# 错误示例(不要这样做)
def process_user_input(user_input):
# assert user_input.isalnum(), "输入包含非法字符" # 不推荐
if not user_input.isalnum():
raise ValueError("输入包含非法字符") # 推荐
return user_input
# assert 的正确用途:调试和开发时的内部检查
def complex_calculation(x, y):
result = x * y + x / y
# 使用断言检查内部逻辑
assert result >= 0, "计算结果应该非负"
return result自定义异常
创建自定义异常类
python
# 创建自定义异常类
# 1. 基本自定义异常
class InsufficientBalanceError(Exception):
"""余额不足异常"""
pass
# 2. 带属性的自定义异常
class InsufficientFundsError(Exception):
"""资金不足异常"""
def __init__(self, balance, amount):
self.balance = balance # 当前余额
self.amount = amount # 需要的金额
self.deficit = amount - balance # 缺口
super().__init__(f"余额不足: 当前余额 {balance},需要 {amount},缺少 {self.deficit}")
# 3. 更完整的自定义异常
class ValidationError(Exception):
"""验证错误异常"""
def __init__(self, field, message, code=None):
self.field = field # 错误字段
self.message = message # 错误消息
self.code = code # 错误代码
super().__init__(f"[{field}] {message}")
# 使用自定义异常
class BankAccount:
"""银行账户类"""
def __init__(self, owner, balance=0):
self.owner = owner
self.balance = balance
def deposit(self, amount):
"""存款"""
if amount <= 0:
raise ValidationError("amount", "存款金额必须大于零", "INVALID_AMOUNT")
self.balance += amount
print(f"存款成功: +{amount},余额: {self.balance}")
def withdraw(self, amount):
"""取款"""
if amount <= 0:
raise ValidationError("amount", "取款金额必须大于零", "INVALID_AMOUNT")
if amount > self.balance:
raise InsufficientFundsError(self.balance, amount)
self.balance -= amount
print(f"取款成功: -{amount},余额: {self.balance}")
def transfer(self, other, amount):
"""转账"""
self.withdraw(amount)
other.deposit(amount)
print(f"转账成功: {self.owner} -> {other.owner}, {amount}")
# 测试自定义异常
account1 = BankAccount("张三", 1000)
account2 = BankAccount("李四", 500)
# 正常操作
print("=== 正常操作 ===")
account1.deposit(500)
account1.withdraw(200)
# 存款金额错误
print("\n=== 存款金额错误 ===")
try:
account1.deposit(-100)
except ValidationError as e:
print(f"验证错误: {e}")
print(f"错误字段: {e.field}, 错误代码: {e.code}")
# 余额不足
print("\n=== 余额不足 ===")
try:
account1.withdraw(5000)
except InsufficientFundsError as e:
print(f"资金不足: {e}")
print(f"当前余额: {e.balance}, 需要: {e.amount}, 缺少: {e.deficit}")自定义异常层次结构
python
# 创建自定义异常层次结构
# 基础异常类
class AppError(Exception):
"""应用程序基础异常"""
def __init__(self, message, code=None):
self.message = message
self.code = code
super().__init__(message)
# 数据库相关异常
class DatabaseError(AppError):
"""数据库异常基类"""
pass
class ConnectionError(DatabaseError):
"""数据库连接异常"""
pass
class QueryError(DatabaseError):
"""查询异常"""
pass
# 用户相关异常
class UserError(AppError):
"""用户相关异常基类"""
pass
class UserNotFoundError(UserError):
"""用户未找到异常"""
pass
class AuthenticationError(UserError):
"""认证异常"""
pass
class PermissionDeniedError(UserError):
"""权限拒绝异常"""
pass
# 验证相关异常
class ValidationError(AppError):
"""验证异常"""
def __init__(self, message, field=None, code=None):
self.field = field
super().__init__(message, code)
# 使用异常层次结构
def get_user(user_id):
"""获取用户信息"""
if user_id <= 0:
raise ValidationError("用户ID必须大于零", "user_id", "INVALID_ID")
if user_id == 404:
raise UserNotFoundError("用户不存在", code="USER_NOT_FOUND")
return {"id": user_id, "name": "张三"}
def login(username, password):
"""用户登录"""
if not username:
raise ValidationError("用户名不能为空", "username")
if password != "123456":
raise AuthenticationError("密码错误", code="WRONG_PASSWORD")
return True
# 统一异常处理
def handle_error(func):
"""异常处理装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except UserNotFoundError as e:
print(f"用户错误: {e.message} (代码: {e.code})")
except AuthenticationError as e:
print(f"认证错误: {e.message} (代码: {e.code})")
except ValidationError as e:
print(f"验证错误: {e.message} (字段: {e.field})")
except DatabaseError as e:
print(f"数据库错误: {e.message}")
except AppError as e:
print(f"应用错误: {e.message}")
except Exception as e:
print(f"未知错误: {e}")
return wrapper
# 测试
@handle_error
def test_get_user(user_id):
user = get_user(user_id)
print(f"获取用户: {user}")
test_get_user(1) # 正常
test_get_user(0) # 验证错误
test_get_user(404) # 用户未找到异常处理的最佳实践
具体异常优先
python
# 异常处理的最佳实践
# 1. 捕获具体的异常,而不是笼统的 Exception
# 不推荐
try:
result = int("abc")
except Exception: # 太笼统
print("发生错误")
# 推荐
try:
result = int("abc")
except ValueError: # 具体的异常
print("输入不是有效的数字")
# 2. 异常处理的顺序:从具体到一般
try:
# 一些操作
result = 10 / 0
except ZeroDivisionError:
print("除零错误")
except ArithmeticError:
print("算术错误")
except Exception:
print("其他错误")
# 3. 不要使用裸露的 except
# 不推荐
try:
result = 10 / 0
except: # 会捕获所有异常,包括 KeyboardInterrupt
print("错误")
# 推荐
try:
result = 10 / 0
except Exception: # 只捕获常规异常
print("错误")适当的异常处理
python
# 适当的异常处理
# 1. 只处理你能处理的异常
def read_config(filename):
"""读取配置文件"""
try:
with open(filename, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
# 文件不存在时返回默认配置
print(f"配置文件 {filename} 不存在,使用默认配置")
return "default config"
# 其他异常(如权限错误)会向上传播
# 2. 不要过度使用异常处理
# 不推荐:用异常控制流程
def find_item(lst, value):
try:
index = lst.index(value)
return index
except ValueError:
return -1
# 推荐:使用正常的方法
def find_item_better(lst, value):
if value in lst:
return lst.index(value)
return -1
# 3. 在正确的层次处理异常
def low_level_operation():
"""底层操作"""
# 这里只抛出异常,不处理
return 10 / 0
def mid_level_operation():
"""中层操作"""
try:
return low_level_operation()
except ZeroDivisionError:
# 中层可以记录日志或添加上下文
raise RuntimeError("中层操作失败") from None
def high_level_operation():
"""高层操作"""
try:
return mid_level_operation()
except RuntimeError as e:
# 高层决定如何处理错误
print(f"操作失败: {e}")
return None
high_level_operation()上下文管理器
python
# 使用上下文管理器处理资源
# 1. 使用 with 语句自动管理资源
# 文件操作
with open("example.txt", "w", encoding="utf-8") as f:
f.write("Hello, Python!")
# 文件自动关闭,即使发生异常
# 2. 自定义上下文管理器
class Timer:
"""计时器上下文管理器"""
def __init__(self, name):
self.name = name
def __enter__(self):
import time
self.start = time.time()
print(f"{self.name} 开始")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
import time
self.end = time.time()
print(f"{self.name} 结束,耗时: {self.end - self.start:.2f}秒")
# 返回 False 表示不抑制异常
return False
# 使用自定义上下文管理器
with Timer("计算任务"):
total = sum(range(1000000))
print(f"计算结果: {total}")
# 3. 使用 contextlib
from contextlib import contextmanager
@contextmanager
def managed_file(filename, mode):
"""文件管理上下文管理器"""
f = None
try:
f = open(filename, mode, encoding='utf-8')
yield f
except Exception as e:
print(f"文件操作错误: {e}")
raise
finally:
if f:
f.close()
# 使用
with managed_file("example.txt", "r") as f:
content = f.read()
print(f"文件内容: {content}")
# 4. 抑制异常
from contextlib import suppress
# 使用 suppress 抑制特定异常
with suppress(FileNotFoundError):
with open("不存在的文件.txt", "r") as f:
content = f.read()
# 文件不存在时不会抛出异常
# 等价于
try:
with open("不存在的文件.txt", "r") as f:
content = f.read()
except FileNotFoundError:
pass日志记录
python
# 异常处理与日志记录
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def divide(a, b):
"""除法函数,带日志记录"""
try:
result = a / b
logger.info(f"计算成功: {a} / {b} = {result}")
return result
except ZeroDivisionError:
logger.error(f"除零错误: 尝试计算 {a} / {b}")
raise
except TypeError as e:
logger.error(f"类型错误: {e}")
raise ValueError("参数必须是数字") from None
# 测试
try:
divide(10, 2)
divide(10, 0)
except Exception as e:
logger.exception("操作失败") # 记录完整的异常信息
# 使用日志记录异常堆栈
def complex_operation():
"""复杂操作"""
try:
result = 10 / 0
except Exception:
logger.exception("复杂操作失败") # 自动记录堆栈信息
raise
try:
complex_operation()
except:
pass常见异常处理模式
重试模式
python
# 重试模式
import time
import random
def retry(max_attempts=3, delay=1, exceptions=(Exception,)):
"""重试装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
print(f"重试 {max_attempts} 次后仍然失败")
raise
print(f"第 {attempt} 次尝试失败: {e},{delay}秒后重试...")
time.sleep(delay)
return wrapper
return decorator
# 模拟不稳定的操作
@retry(max_attempts=3, delay=1, exceptions=(ConnectionError,))
def unstable_operation():
"""模拟不稳定的网络操作"""
if random.random() < 0.7: # 70% 概率失败
raise ConnectionError("网络连接失败")
return "操作成功"
# 测试
try:
result = unstable_operation()
print(result)
except ConnectionError as e:
print(f"最终失败: {e}")超时模式
python
# 超时模式
import signal
import time
# Unix/Linux 系统使用 signal
class TimeoutError(Exception):
"""超时异常"""
pass
def timeout_handler(signum, frame):
raise TimeoutError("操作超时")
def with_timeout(seconds):
"""超时装饰器(仅适用于 Unix/Linux)"""
def decorator(func):
def wrapper(*args, **kwargs):
# 设置信号处理器
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0) # 取消闹钟
return result
return wrapper
return decorator
# 使用多进程实现跨平台超时
from multiprocessing import Process, Queue
def run_with_timeout(func, args=(), kwargs=None, timeout=5):
"""带超时执行函数"""
if kwargs is None:
kwargs = {}
def worker(queue):
try:
result = func(*args, **kwargs)
queue.put((True, result))
except Exception as e:
queue.put((False, e))
queue = Queue()
process = Process(target=worker, args=(queue,))
process.start()
process.join(timeout=timeout)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"函数执行超过 {timeout} 秒")
success, result = queue.get()
if success:
return result
else:
raise result
# 测试
def slow_function():
"""模拟耗时操作"""
time.sleep(3)
return "完成"
try:
result = run_with_timeout(slow_function, timeout=2)
print(result)
except TimeoutError as e:
print(f"超时: {e}")熔断器模式
python
# 熔断器模式
import time
from enum import Enum
class CircuitState(Enum):
"""熔断器状态"""
CLOSED = "closed" # 关闭(正常)
OPEN = "open" # 打开(熔断)
HALF_OPEN = "half_open" # 半开(尝试恢复)
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold # 失败阈值
self.recovery_timeout = recovery_timeout # 恢复超时时间(秒)
self.failure_count = 0 # 失败计数
self.state = CircuitState.CLOSED # 当前状态
self.last_failure_time = None # 最后一次失败时间
def call(self, func, *args, **kwargs):
"""通过熔断器调用函数"""
if self.state == CircuitState.OPEN:
# 检查是否可以尝试恢复
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("熔断器打开,拒绝请求")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_recovery(self):
"""检查是否应该尝试恢复"""
if self.last_failure_time is None:
return False
return time.time() - self.last_failure_time >= self.recovery_timeout
def _on_success(self):
"""成功时重置"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""失败时更新状态"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"熔断器打开:失败次数 {self.failure_count}")
# 使用熔断器
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5)
def unstable_service():
"""模拟不稳定的服务"""
import random
if random.random() < 0.7:
raise Exception("服务不可用")
return "服务响应"
for i in range(10):
try:
result = breaker.call(unstable_service)
print(f"调用成功: {result}")
except Exception as e:
print(f"调用失败: {e}")
time.sleep(1)小结
本章学习了Python异常处理的完整知识:
异常基础:
- 理解异常的概念和层次结构
- 掌握常见的内置异常类型
异常处理语法:
try-except捕获和处理异常try-except-else-finally完整结构raise抛出异常assert断言
自定义异常:
- 创建自定义异常类
- 设计异常层次结构
最佳实践:
- 捕获具体异常
- 适当的异常处理层次
- 使用上下文管理器
- 日志记录
常见模式:
- 重试模式
- 超时模式
- 熔断器模式
掌握异常处理是编写健壮Python程序的关键技能。
