Skip to content

事务处理

事务概述

MongoDB 4.0+ 支持多文档事务,保证多个操作的原子性。

ACID 特性

特性说明
原子性 (Atomicity)事务是不可分割的工作单位
一致性 (Consistency)事务前后数据保持一致状态
隔离性 (Isolation)多个事务互不干扰
持久性 (Durability)事务提交后永久保存

事务要求

  • MongoDB 4.0+ (副本集)
  • MongoDB 4.2+ (分片集群)
  • WiredTiger 存储引擎
  • 副本集架构

基本事务

启动事务

javascript
const session = db.getMongo().startSession();
session.startTransaction();

try {
  const users = session.getDatabase("mydb").users;
  const orders = session.getDatabase("mydb").orders;

  users.updateOne({ _id: 1 }, { $inc: { balance: -100 } });

  orders.insertOne({
    user_id: 1,
    amount: 100,
    status: "completed",
  });

  session.commitTransaction();
  print("事务提交成功");
} catch (error) {
  session.abortTransaction();
  print("事务回滚: " + error);
} finally {
  session.endSession();
}

事务选项

javascript
session.startTransaction({
  readConcern: { level: "snapshot" },
  writeConcern: { w: "majority" },
  readPreference: "primary",
});

事务语法

startSession

javascript
const session = db.getMongo().startSession({
  causalConsistency: true,
  readPreference: "primary",
});

startTransaction

javascript
session.startTransaction();
session.startTransaction({
  readConcern: { level: "snapshot" },
  writeConcern: { w: "majority" },
});

commitTransaction

javascript
session.commitTransaction();

abortTransaction

javascript
session.abortTransaction();

endSession

javascript
session.endSession();

事务示例

转账示例

javascript
function transfer(fromId, toId, amount) {
  const session = db.getMongo().startSession();

  try {
    session.startTransaction();

    const accounts = session.getDatabase("bank").accounts;

    const fromAccount = accounts.findOne({ _id: fromId });
    if (fromAccount.balance < amount) {
      throw new Error("余额不足");
    }

    accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } });

    accounts.updateOne({ _id: toId }, { $inc: { balance: amount } });

    session.commitTransaction();
    return { success: true };
  } catch (error) {
    session.abortTransaction();
    return { success: false, error: error.message };
  } finally {
    session.endSession();
  }
}

transfer(1, 2, 100);

订单创建示例

javascript
function createOrder(userId, items) {
  const session = db.getMongo().startSession();

  try {
    session.startTransaction();

    const dbSession = session.getDatabase("shop");
    const orders = dbSession.orders;
    const products = dbSession.products;
    const inventory = dbSession.inventory;

    let totalAmount = 0;

    for (let item of items) {
      const product = products.findOne({ _id: item.productId });
      const stock = inventory.findOne({ productId: item.productId });

      if (stock.quantity < item.quantity) {
        throw new Error(`商品 ${product.name} 库存不足`);
      }

      inventory.updateOne(
        { productId: item.productId },
        { $inc: { quantity: -item.quantity } },
      );

      totalAmount += product.price * item.quantity;
    }

    const order = orders.insertOne({
      userId: userId,
      items: items,
      totalAmount: totalAmount,
      status: "pending",
      createdAt: new Date(),
    });

    session.commitTransaction();
    return { success: true, orderId: order.insertedId };
  } catch (error) {
    session.abortTransaction();
    return { success: false, error: error.message };
  } finally {
    session.endSession();
  }
}

读关注 (Read Concern)

读关注级别

级别说明
local读取本地数据(默认)
available读取可用数据
majority读取大多数节点确认的数据
linearizable线性化读取
snapshot快照读取

设置读关注

javascript
db.users.find().readConcern("majority");

session.startTransaction({
  readConcern: { level: "snapshot" },
});

各级别详解

local

javascript
db.users.find().readConcern("local");

读取本地最新数据,可能读取到未提交的数据。

majority

javascript
db.users.find().readConcern("majority");

读取大多数节点确认的数据,避免脏读。

linearizable

javascript
db.users.findOne({ _id: 1 }).readConcern("linearizable");

线性化读取,确保读取到最新的已提交数据。

snapshot

javascript
session.startTransaction({
  readConcern: { level: "snapshot" },
});

快照读取,事务中读取一致性快照。

写关注 (Write Concern)

写关注级别

级别说明
w: 0不确认写入
w: 1主节点确认(默认)
w: majority大多数节点确认
w: nn个节点确认
j: true等待日志落盘

设置写关注

javascript
db.users.insertOne(
  { name: "张三" },
  { writeConcern: { w: "majority", j: true } },
);

session.startTransaction({
  writeConcern: { w: "majority" },
});

写关注示例

javascript
db.users.insertOne(
  { name: "张三" },
  { writeConcern: { w: 3, j: true, wtimeout: 5000 } },
);

读偏好 (Read Preference)

读偏好模式

模式说明
primary只读主节点(默认)
primaryPreferred优先主节点
secondary只读从节点
secondaryPreferred优先从节点
nearest最近节点

设置读偏好

javascript
db.users.find().readPref("secondary");

db.users.find().readPref("secondaryPreferred");

session.startTransaction({
  readPreference: "primary",
});

隔离级别

MongoDB 隔离级别

隔离级别说明
快照隔离事务中读取一致性快照
因果一致性因果相关的操作顺序一致

快照隔离

javascript
const session = db.getMongo().startSession();
session.startTransaction({
  readConcern: { level: "snapshot" },
});

const users = session.getDatabase("mydb").users;
const user = users.findOne({ _id: 1 });

session.commitTransaction();
session.endSession();

因果一致性

javascript
const session = db.getMongo().startSession({
  causalConsistency: true,
});

const users = session.getDatabase("mydb").users;
users.insertOne({ name: "张三" });

const result = users.findOne({ name: "张三" });

session.endSession();

重试事务

自动重试

javascript
function runTransactionWithRetry(txnFunc, session) {
  while (true) {
    try {
      txnFunc(session);
      break;
    } catch (error) {
      if (
        error.errorLabels &&
        error.errorLabels.includes("TransientTransactionError")
      ) {
        print("重试事务...");
        continue;
      }
      throw error;
    }
  }
}

function commitWithRetry(session) {
  while (true) {
    try {
      session.commitTransaction();
      print("事务提交成功");
      break;
    } catch (error) {
      if (
        error.errorLabels &&
        error.errorLabels.includes("UnknownTransactionCommitResult")
      ) {
        print("重试提交...");
        continue;
      }
      throw error;
    }
  }
}

完整重试示例

javascript
function transferWithRetry(fromId, toId, amount) {
  const session = db.getMongo().startSession();

  try {
    session.startTransaction();

    const accounts = session.getDatabase("bank").accounts;

    accounts.updateOne({ _id: fromId }, { $inc: { balance: -amount } });

    accounts.updateOne({ _id: toId }, { $inc: { balance: amount } });

    commitWithRetry(session);
  } catch (error) {
    print("事务失败: " + error);
    session.abortTransaction();
  } finally {
    session.endSession();
  }
}

事务限制

时间限制

javascript
db.adminCommand({
  setParameter: 1,
  transactionLifetimeLimitSeconds: 60,
});

大小限制

  • 单个事务操作数限制:1000
  • 单个事务大小限制:16MB

锁等待超时

javascript
db.adminCommand({
  setParameter: 1,
  maxTransactionLockRequestTimeoutMillis: 5000,
});

事务监控

查看当前事务

javascript
db.currentOp({ transaction: { $exists: true } });

查看事务详情

javascript
db.currentOp(
  {
    transaction: { $exists: true },
  },
  { truncateOps: false },
);

终止事务

javascript
db.killOp(opId);

事务统计

javascript
db.serverStatus().transactions;

事务最佳实践

1. 保持事务简短

javascript
session.startTransaction();
try {
  db.users.updateOne({ _id: 1 }, { $set: { status: "active" } });
  session.commitTransaction();
} finally {
  session.endSession();
}

2. 避免长事务

javascript
session.startTransaction({
  maxTimeMS: 5000,
});

3. 正确处理错误

javascript
try {
  session.startTransaction();
  session.commitTransaction();
} catch (error) {
  if (session.transaction.state === "in_progress") {
    session.abortTransaction();
  }
  throw error;
} finally {
  session.endSession();
}

4. 使用合适的读关注

javascript
session.startTransaction({
  readConcern: { level: "majority" },
  writeConcern: { w: "majority" },
});

5. 避免热点数据

将热点数据分散到不同事务中。

下一步学习