Appearance
事务处理
事务概述
MongoDB 4.0+ 支持多文档事务,保证多个操作的原子性。
ACID 特性
| 特性 | 说明 |
|---|---|
| 原子性 (Atomicity) | 事务是不可分割的工作单位 |
| 一致性 (Consistency) | 事务前后数据保持一致状态 |
| 隔离性 (Isolation) | 多个事务互不干扰 |
| 持久性 (Durability) | 事务提交后永久保存 |
事务要求
- MongoDB 4.0+ (副本集)
- MongoDB 4.2+ (分片集群)
- WiredTiger 存储引擎
- 副本集架构
基本事务
启动事务
javascript
const session = db.getMongo().startSession();
session.startTransaction();
try {
const users = session.getDatabase("mydb").users;
const orders = session.getDatabase("mydb").orders;
users.updateOne({ _id: 1 }, { $inc: { balance: -100 } });
orders.insertOne({
user_id: 1,
amount: 100,
status: "completed",
});
session.commitTransaction();
print("事务提交成功");
} catch (error) {
session.abortTransaction();
print("事务回滚: " + error);
} finally {
session.endSession();
}事务选项
javascript
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" },
readPreference: "primary",
});事务语法
startSession
javascript
const session = db.getMongo().startSession({
causalConsistency: true,
readPreference: "primary",
});startTransaction
javascript
session.startTransaction();
session.startTransaction({
readConcern: { level: "snapshot" },
writeConcern: { w: "majority" },
});commitTransaction
javascript
session.commitTransaction();abortTransaction
javascript
session.abortTransaction();endSession
javascript
session.endSession();事务示例
转账示例
javascript
function transfer(fromId, toId, amount) {
const session = db.getMongo().startSession();
try {
session.startTransaction();
const accounts = session.getDatabase("bank").accounts;
const fromAccount = accounts.findOne({ _id: fromId });
if (fromAccount.balance < amount) {
throw new Error("余额不足");
}
accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } });
accounts.updateOne({ _id: toId }, { $inc: { balance: amount } });
session.commitTransaction();
return { success: true };
} catch (error) {
session.abortTransaction();
return { success: false, error: error.message };
} finally {
session.endSession();
}
}
transfer(1, 2, 100);订单创建示例
javascript
function createOrder(userId, items) {
const session = db.getMongo().startSession();
try {
session.startTransaction();
const dbSession = session.getDatabase("shop");
const orders = dbSession.orders;
const products = dbSession.products;
const inventory = dbSession.inventory;
let totalAmount = 0;
for (let item of items) {
const product = products.findOne({ _id: item.productId });
const stock = inventory.findOne({ productId: item.productId });
if (stock.quantity < item.quantity) {
throw new Error(`商品 ${product.name} 库存不足`);
}
inventory.updateOne(
{ productId: item.productId },
{ $inc: { quantity: -item.quantity } },
);
totalAmount += product.price * item.quantity;
}
const order = orders.insertOne({
userId: userId,
items: items,
totalAmount: totalAmount,
status: "pending",
createdAt: new Date(),
});
session.commitTransaction();
return { success: true, orderId: order.insertedId };
} catch (error) {
session.abortTransaction();
return { success: false, error: error.message };
} finally {
session.endSession();
}
}读关注 (Read Concern)
读关注级别
| 级别 | 说明 |
|---|---|
| local | 读取本地数据(默认) |
| available | 读取可用数据 |
| majority | 读取大多数节点确认的数据 |
| linearizable | 线性化读取 |
| snapshot | 快照读取 |
设置读关注
javascript
db.users.find().readConcern("majority");
session.startTransaction({
readConcern: { level: "snapshot" },
});各级别详解
local
javascript
db.users.find().readConcern("local");读取本地最新数据,可能读取到未提交的数据。
majority
javascript
db.users.find().readConcern("majority");读取大多数节点确认的数据,避免脏读。
linearizable
javascript
db.users.findOne({ _id: 1 }).readConcern("linearizable");线性化读取,确保读取到最新的已提交数据。
snapshot
javascript
session.startTransaction({
readConcern: { level: "snapshot" },
});快照读取,事务中读取一致性快照。
写关注 (Write Concern)
写关注级别
| 级别 | 说明 |
|---|---|
| w: 0 | 不确认写入 |
| w: 1 | 主节点确认(默认) |
| w: majority | 大多数节点确认 |
| w: n | n个节点确认 |
| j: true | 等待日志落盘 |
设置写关注
javascript
db.users.insertOne(
{ name: "张三" },
{ writeConcern: { w: "majority", j: true } },
);
session.startTransaction({
writeConcern: { w: "majority" },
});写关注示例
javascript
db.users.insertOne(
{ name: "张三" },
{ writeConcern: { w: 3, j: true, wtimeout: 5000 } },
);读偏好 (Read Preference)
读偏好模式
| 模式 | 说明 |
|---|---|
| primary | 只读主节点(默认) |
| primaryPreferred | 优先主节点 |
| secondary | 只读从节点 |
| secondaryPreferred | 优先从节点 |
| nearest | 最近节点 |
设置读偏好
javascript
db.users.find().readPref("secondary");
db.users.find().readPref("secondaryPreferred");
session.startTransaction({
readPreference: "primary",
});隔离级别
MongoDB 隔离级别
| 隔离级别 | 说明 |
|---|---|
| 快照隔离 | 事务中读取一致性快照 |
| 因果一致性 | 因果相关的操作顺序一致 |
快照隔离
javascript
const session = db.getMongo().startSession();
session.startTransaction({
readConcern: { level: "snapshot" },
});
const users = session.getDatabase("mydb").users;
const user = users.findOne({ _id: 1 });
session.commitTransaction();
session.endSession();因果一致性
javascript
const session = db.getMongo().startSession({
causalConsistency: true,
});
const users = session.getDatabase("mydb").users;
users.insertOne({ name: "张三" });
const result = users.findOne({ name: "张三" });
session.endSession();重试事务
自动重试
javascript
function runTransactionWithRetry(txnFunc, session) {
while (true) {
try {
txnFunc(session);
break;
} catch (error) {
if (
error.errorLabels &&
error.errorLabels.includes("TransientTransactionError")
) {
print("重试事务...");
continue;
}
throw error;
}
}
}
function commitWithRetry(session) {
while (true) {
try {
session.commitTransaction();
print("事务提交成功");
break;
} catch (error) {
if (
error.errorLabels &&
error.errorLabels.includes("UnknownTransactionCommitResult")
) {
print("重试提交...");
continue;
}
throw error;
}
}
}完整重试示例
javascript
function transferWithRetry(fromId, toId, amount) {
const session = db.getMongo().startSession();
try {
session.startTransaction();
const accounts = session.getDatabase("bank").accounts;
accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } });
accounts.updateOne({ _id: toId }, { $inc: { balance: amount } });
commitWithRetry(session);
} catch (error) {
print("事务失败: " + error);
session.abortTransaction();
} finally {
session.endSession();
}
}事务限制
时间限制
javascript
db.adminCommand({
setParameter: 1,
transactionLifetimeLimitSeconds: 60,
});大小限制
- 单个事务操作数限制:1000
- 单个事务大小限制:16MB
锁等待超时
javascript
db.adminCommand({
setParameter: 1,
maxTransactionLockRequestTimeoutMillis: 5000,
});事务监控
查看当前事务
javascript
db.currentOp({ transaction: { $exists: true } });查看事务详情
javascript
db.currentOp(
{
transaction: { $exists: true },
},
{ truncateOps: false },
);终止事务
javascript
db.killOp(opId);事务统计
javascript
db.serverStatus().transactions;事务最佳实践
1. 保持事务简短
javascript
session.startTransaction();
try {
db.users.updateOne({ _id: 1 }, { $set: { status: "active" } });
session.commitTransaction();
} finally {
session.endSession();
}2. 避免长事务
javascript
session.startTransaction({
maxTimeMS: 5000,
});3. 正确处理错误
javascript
try {
session.startTransaction();
session.commitTransaction();
} catch (error) {
if (session.transaction.state === "in_progress") {
session.abortTransaction();
}
throw error;
} finally {
session.endSession();
}4. 使用合适的读关注
javascript
session.startTransaction({
readConcern: { level: "majority" },
writeConcern: { w: "majority" },
});5. 避免热点数据
将热点数据分散到不同事务中。