Skip to content

事务

MongoDB从4.0版本开始支持多文档事务,允许在多个文档和集合上执行原子操作。本章将介绍MongoDB事务的概念、使用方法和最佳实践。

事务概述

什么是事务

javascript
// 事务是一组数据库操作的逻辑单元,具有ACID特性:

// A - Atomicity(原子性)
// 事务中的所有操作要么全部成功,要么全部失败回滚

// C - Consistency(一致性)
// 事务执行前后,数据库从一个一致状态转换到另一个一致状态

// I - Isolation(隔离性)
// 多个事务并发执行时,每个事务都感觉不到其他事务的存在

// D - Durability(持久性)
// 事务一旦提交,对数据的修改就是永久的

// MongoDB事务的特点:
// 1. 支持多文档事务(跨文档、跨集合)
// 2. 支持跨分片事务(MongoDB 4.2+)
// 3. 必须在复制集或分片集群上运行
// 4. 仅支持WiredTiger存储引擎

事务使用场景

javascript
// 适合使用事务的场景:

// 1. 银行转账
//    - 从账户A扣款
//    - 向账户B加款
//    这两个操作必须同时成功或同时失败

// 2. 订单处理
//    - 创建订单
//    - 扣减库存
//    - 扣除用户余额
//    这三个操作必须保持一致性

// 3. 数据迁移
//    - 删除旧数据
//    - 插入新数据
//    必须保证数据完整性

// 不适合使用事务的场景:
// 1. 单文档操作(MongoDB单文档操作本身就是原子的)
// 2. 长时间运行的操作
// 3. 跨分片的大批量操作

事务基本操作

开启和提交事务

javascript
// 事务基本操作流程

// 准备测试数据
db.accounts.insertMany([
    { accountId: "A001", name: "张三", balance: 10000 },
    { accountId: "A002", name: "李四", balance: 5000 }
])

// 方式一:使用session手动管理事务
var session = db.getMongo().startSession();  // 创建会话
session.startTransaction();                   // 开启事务

try {
    var accounts = session.getDatabase("test").accounts;
    
    // 从张三账户扣款1000
    accounts.updateOne(
        { accountId: "A001" },
        { $inc: { balance: -1000 } }
    );
    
    // 向李四账户加款1000
    accounts.updateOne(
        { accountId: "A002" },
        { $inc: { balance: 1000 } }
    );
    
    session.commitTransaction();  // 提交事务
    print("转账成功");
} catch (error) {
    session.abortTransaction();   // 回滚事务
    print("转账失败: " + error);
} finally {
    session.endSession();         // 结束会话
}

使用withTransaction方法

javascript
// 方式二:使用withTransaction方法(推荐)

var session = db.getMongo().startSession();

try {
    session.withTransaction(function() {
        var accounts = session.getDatabase("test").accounts;
        
        // 从张三账户扣款
        accounts.updateOne(
            { accountId: "A001" },
            { $inc: { balance: -1000 } }
        );
        
        // 向李四账户加款
        accounts.updateOne(
            { accountId: "A002" },
            { $inc: { balance: 1000 } }
        );
    });
    
    print("转账成功");
} catch (error) {
    print("转账失败: " + error);
} finally {
    session.endSession();
}

事务配置选项

javascript
// 事务配置选项

var session = db.getMongo().startSession();

session.startTransaction({
    readConcern: { level: "snapshot" },      // 读关注级别
    writeConcern: { w: "majority" },         // 写关注级别
    readPreference: "primary",               // 读偏好
    maxCommitTimeMS: 30000                   // 最大提交时间(毫秒)
});

// 读关注级别:
// - local: 读取最新数据,可能读到未提交的数据
// - available: 与local类似,但用于分片
// - majority: 只读取已提交到大多数节点的数据
// - linearizable: 线性一致性读取
// - snapshot: 快照读取

// 写关注级别:
// - w: 0 - 不等待确认
// - w: 1 - 等待主节点确认
// - w: "majority" - 等待大多数节点确认
// - w: <n> - 等待n个节点确认

事务隔离级别

MongoDB的隔离级别

javascript
// MongoDB默认使用快照隔离(Snapshot Isolation)

// 快照隔离的特点:
// 1. 事务开始时创建数据快照
// 2. 事务期间读取的是快照数据
// 3. 事务提交时检查是否有冲突

// 示例:演示快照隔离
var session1 = db.getMongo().startSession();
var session2 = db.getMongo().startSession();

// 会话1:开启事务并读取数据
session1.startTransaction();
var account1 = session1.getDatabase("test").accounts.findOne({ accountId: "A001" });
print("会话1读取余额: " + account1.balance);

// 会话2:在会话1事务期间修改数据
session2.startTransaction();
session2.getDatabase("test").accounts.updateOne(
    { accountId: "A001" },
    { $set: { balance: 20000 } }
);
session2.commitTransaction();
print("会话2修改余额为20000");

// 会话1:再次读取,仍然是快照数据
account1 = session1.getDatabase("test").accounts.findOne({ accountId: "A001" });
print("会话1再次读取余额: " + account1.balance);  // 仍然是10000

session1.commitTransaction();
session1.endSession();
session2.endSession();

读关注级别详解

javascript
// 读关注级别详解

// local - 读取最新数据
db.accounts.find({ accountId: "A001" }).readConcern("local")

// majority - 只读取已提交到大多数节点的数据
db.accounts.find({ accountId: "A001" }).readConcern("majority")

// linearizable - 线性一致性(需要写关注为majority)
db.accounts.find({ accountId: "A001" }).readConcern("linearizable")

// snapshot - 快照读取(事务中默认)
var session = db.getMongo().startSession();
session.startTransaction({ readConcern: { level: "snapshot" } });
// ...事务操作
session.commitTransaction();
session.endSession();

事务实战案例

银行转账

javascript
// 银行转账完整示例

function transferMoney(fromAccountId, toAccountId, amount) {
    var session = db.getMongo().startSession();
    
    try {
        session.withTransaction(function() {
            var accounts = session.getDatabase("test").accounts;
            
            // 1. 检查转出账户余额是否充足
            var fromAccount = accounts.findOne({ accountId: fromAccountId });
            if (!fromAccount) {
                throw new Error("转出账户不存在");
            }
            if (fromAccount.balance < amount) {
                throw new Error("余额不足");
            }
            
            // 2. 检查转入账户是否存在
            var toAccount = accounts.findOne({ accountId: toAccountId });
            if (!toAccount) {
                throw new Error("转入账户不存在");
            }
            
            // 3. 执行转账
            var result1 = accounts.updateOne(
                { accountId: fromAccountId, balance: { $gte: amount } },
                { $inc: { balance: -amount } }
            );
            
            if (result1.modifiedCount === 0) {
                throw new Error("扣款失败,可能余额不足");
            }
            
            var result2 = accounts.updateOne(
                { accountId: toAccountId },
                { $inc: { balance: amount } }
            );
            
            if (result2.modifiedCount === 0) {
                throw new Error("加款失败");
            }
            
            // 4. 记录转账日志
            session.getDatabase("test").transferLogs.insertOne({
                fromAccount: fromAccountId,
                toAccount: toAccountId,
                amount: amount,
                timestamp: new Date(),
                status: "success"
            });
            
        }, {
            readConcern: { level: "snapshot" },
            writeConcern: { w: "majority" }
        });
        
        print("转账成功: " + fromAccountId + " -> " + toAccountId + ", 金额: " + amount);
        return true;
        
    } catch (error) {
        print("转账失败: " + error.message);
        return false;
    } finally {
        session.endSession();
    }
}

// 测试转账
transferMoney("A001", "A002", 1000);

订单处理

javascript
// 订单处理事务示例

function createOrder(userId, productId, quantity) {
    var session = db.getMongo().startSession();
    
    try {
        var result = {};
        
        session.withTransaction(function() {
            var db = session.getDatabase("test");
            
            // 1. 检查库存
            var product = db.products.findOne({ productId: productId });
            if (!product) {
                throw new Error("商品不存在");
            }
            if (product.stock < quantity) {
                throw new Error("库存不足");
            }
            
            // 2. 检查用户余额
            var user = db.users.findOne({ userId: userId });
            if (!user) {
                throw new Error("用户不存在");
            }
            var totalAmount = product.price * quantity;
            if (user.balance < totalAmount) {
                throw new Error("余额不足");
            }
            
            // 3. 创建订单
            var order = {
                orderId: "ORD" + Date.now(),
                userId: userId,
                productId: productId,
                quantity: quantity,
                totalAmount: totalAmount,
                status: "pending",
                createdAt: new Date()
            };
            db.orders.insertOne(order);
            result.orderId = order.orderId;
            
            // 4. 扣减库存
            var stockResult = db.products.updateOne(
                { productId: productId, stock: { $gte: quantity } },
                { $inc: { stock: -quantity } }
            );
            if (stockResult.modifiedCount === 0) {
                throw new Error("扣减库存失败");
            }
            
            // 5. 扣除用户余额
            var balanceResult = db.users.updateOne(
                { userId: userId, balance: { $gte: totalAmount } },
                { $inc: { balance: -totalAmount } }
            );
            if (balanceResult.modifiedCount === 0) {
                throw new Error("扣除余额失败");
            }
            
            // 6. 更新订单状态
            db.orders.updateOne(
                { orderId: order.orderId },
                { $set: { status: "completed" } }
            );
            
        }, {
            readConcern: { level: "snapshot" },
            writeConcern: { w: "majority" }
        });
        
        print("订单创建成功: " + result.orderId);
        return result;
        
    } catch (error) {
        print("订单创建失败: " + error.message);
        return null;
    } finally {
        session.endSession();
    }
}

事务限制和注意事项

事务限制

javascript
// MongoDB事务的限制:

// 1. 时间限制
// - 默认事务超时时间:60秒
// - 可以通过transactionLifetimeLimitSeconds配置修改

// 2. 文档大小限制
// - 单个事务操作的文档总大小不能超过16MB

// 3. 操作限制
// - 不能在事务中创建或删除集合
// - 不能在事务中创建索引
// - 不能在事务中使用$lookup关联非事务集合

// 4. 分片事务限制
// - 分片事务需要mongos路由
// - 所有分片必须使用WiredTiger引擎

// 查看事务配置
db.adminCommand({ getParameter: 1, transactionLifetimeLimitSeconds: 1 })

事务最佳实践

javascript
// 事务最佳实践

// 1. 保持事务简短
//    - 事务时间越长,持有锁的时间越长
//    - 影响并发性能

// 2. 避免在事务中执行耗时操作
//    - 不要在事务中进行网络请求
//    - 不要在事务中进行复杂计算

// 3. 使用合适的读写关注级别
//    - 大多数场景使用默认配置即可
//    - 金融场景使用majority级别

// 4. 处理写冲突
//    当两个事务同时修改同一文档时,会发生写冲突
//    MongoDB会自动重试事务

// 5. 合理使用重试逻辑
function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);
            break;
        } catch (error) {
            // 如果是写冲突,自动重试
            if (error.code === 112 || error.code === 244) {
                print("写冲突,重试事务...");
                continue;
            }
            throw error;
        }
    }
}

监控事务

javascript
// 监控事务

// 查看当前活跃的事务
db.currentOp({ "transaction": { $exists: true } })

// 查看事务统计信息
db.serverStatus().transactions

// 输出示例:
// {
//   "totalStarted" : NumberLong(10),
//   "totalCommitted" : NumberLong(8),
//   "totalAborted" : NumberLong(2),
//   "currentActive" : NumberLong(1),
//   "currentInactive" : NumberLong(0),
//   "currentOpen" : NumberLong(1)
// }

// 查看事务锁信息
db.getSiblingDB("admin").aggregate([
    { $currentOp: { allUsers: true, idleCursors: true } },
    { $match: { "transaction": { $exists: true } } }
])

本章小结

本章介绍了MongoDB事务的相关知识:

  1. 事务概述:理解事务的ACID特性和使用场景
  2. 基本操作:掌握开启、提交、回滚事务的方法
  3. 隔离级别:了解MongoDB的快照隔离和读关注级别
  4. 实战案例:通过银行转账和订单处理案例掌握事务应用
  5. 限制和注意事项:了解事务的限制和最佳实践
  6. 监控事务:学会查看和监控事务状态

下一章,我们将学习用户权限,了解MongoDB的安全认证机制。