本文记录 kudu 源码阅读笔记,记录了 kudu tserver 端非事务写入的完整流程。

写入流程总览

Kudu 的一次“非事务”写(普通单/批行 Insert/Upsert/Update/Delete,不带 txn_id)在 tserver 侧需要经历以下阶段:

  1. Client 组装 WriteRequestPB 并通过 RPC 发送 (tserver_service.proto: Write).
  2. RPC 入口:TabletServiceImpl::Write() 做权限、资源(内存 / 线程池)与节流检查,构造 WriteOpState
  3. 提交给 TabletReplica::SubmitWrite(),创建 WriteOpOpDriver(LEADER 模式)。
  4. OpDriver::ExecuteAsync() 进入 Prepare 阶段:解码行、锁定 schema、行级锁 / 分区锁(非事务写仅行锁)、权限校验。
  5. (LEADER)向 RAFT 复制:给 ReplicateMsg 分配 timestamp,RaftConsensus::Replicate() 开始复制;Follower 侧由共识回放直接创建 Follower OpDriver 并 Start。
  6. RAFT 提交(ReplicationFinished())后进入 Apply 阶段:WriteOp::Apply() 调用 Tablet::ApplyRowOperations() 执行实际插入/更新/删除(写入 MemRowSet / DeltaMemStore)。
  7. 生成 CommitMsg 写入 WAL;WriteOp::Finish() 更新 MVCC 状态与各种 metrics;释放行锁、schema 锁。
  8. RPC 回调发送 WriteResponsePB 给客户端(含 per-row errors、资源 metrics、timestamp 等)。

流程总览图如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Write RPC
  |
TabletServiceImpl::Write
  |
TabletReplica::SubmitWrite
  |
OpDriver::ExecuteAsync
  |
+--> Prepare (Decode + RowLocks)
|        |
|        +-- Leader: AssignTimestamp + Start + Replicate
|                              |
|                              v
|                     RaftConsensus (replicate)
|                              |
|                        ReplicationFinished
|                              |
+-------- if PREPARED && REPLICATED ---------+
                              v
                         ApplyTask (ApplyRowOperations)
                              |
                       WAL AsyncAppendCommit
                              |
                     CommitWait? (external consistency)
                              |
                            Finalize/Finish
                              |
                          Response to client

写入流程的时序图:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Client          TabletServiceImpl            TabletReplica/OpDriver          RaftConsensus          Tablet
  | Write RPC -->|                           |                               |                      |
  |              | 校验/节流/构造State         |                               |                      |
  |              | SubmitWrite() ----------->| NewLeaderOpDriver()           |                      |
  |              |                           | ExecuteAsync()                |                      |
  |              |                           | PrepareTask -> Prepare()      |                      |
  |              |                           |  Decode ops + RowLocks        |                      |
  |              |                           |  Assign timestamp             |                      |
  |              |                           |  Replicate() ---------------->| send to peers        |
  |              |                           |                               | (consensus commit)   |
  |              |                           | ReplicationFinished()<--------| committed            |
  |              |                           | ApplyAsync -> Apply()         |                      | ApplyRowOperations
  |              |                           | Commit (WAL)                  |                      | MemRowSet/Delta
  |              |                           | Finish()                      |                      | MVCC finish
  |<-- Response  |                           |                               |                      |

流程关键特点:

  • 采用 MVCC (MvccManager) 保证读写隔离与快照语义;写在获取 timestamp 后进入 in-flight 集合,Apply 完成后可见。
  • 写路径分 Prepare / Replicate / Apply / Commit 四个大的阶段,由 OpDriver 状态机驱动。
  • 非事务写不涉及 TxnRowSets 与 partition lock(除启用 FLAGS_enable_txn_partition_lock 时 follower 列表差异),无提交/回滚二阶段。

下文将对一些关键的阶段进行分析。

RPC 入口与初始校验

入口文件/方法src/kudu/tserver/tablet_service.cc: TabletServiceImpl::Write()

核心逻辑:

  • Tablet 查找:LookupRunningTabletReplicaOrRespond()
  • 访问控制:可选 Token -> privileges(INSERT / UPDATE / DELETE)。
  • 外部一致性模式检查:clock()->SupportsExternalConsistencyMode()
  • 节流:tablet->ShouldThrottleAllow(bytes)、进程软内存限制 process_memory::SoftLimitExceeded()、应用队列是否过载 tablet_apply_pool()->QueueOverloaded()
  • 构造 WriteOpState:携带请求、响应、以及可选授权上下文。
  • 区分非事务/事务:无 txn_id -> SubmitWrite();有 txn_id -> 走事务前置调度 SubmitTxnWrite()

WriteOpState 与 WriteOp

文件src/kudu/tablet/ops/write_op.h / write_op.cc

关键数据结构与持有关系:

  • WriteOpState:保存原始请求、解析后的 RowOp 列表、行锁集合 ScopedRowLock、MVCC ScopedOp、schema 快照、metrics 计数器。
  • WriteOp:以成员 std::unique_ptr<WriteOpState> state_ 持有 WriteOpState(参见 write_op.hclass WriteOp 中的 state_),并对其进行阶段驱动(Prepare/Start/Apply/Finish)。
    • 生命周期:WriteOp 在构造时接管 WriteOpState 的所有权;阶段结束时通过 Finish() 触发 WriteOpState::FinishApplyingOrAbort(),释放行锁、schema 锁与 MVCC 事务对象。

阶段职责: 以下结合源码,具体分析 WriteOp 四个核心阶段的实现与与 OpDriver 的耦合:

WriteOp 关键阶段分析

Prepare(解析与锁定)

  • 入口WriteOp::Prepare()write_op.cc)。由 OpDriver::Prepare() 调用。
  • 主要步骤
    • Schema 快照与解码:Tablet::DecodeWriteOperations(state) 根据请求中的 RowOperationsPB 解析为内部 RowOp 列表,并捕获当前 schema 快照(避免并发更改影响)。
    • 行锁获取:Tablet::AcquireRowLocks(state) 为每个待操作的 key 申请 ScopedRowLock(排他),批量方式减少争用;支持 UPSERT_IGNORE 等在存在冲突时的特殊处理策略。
    • 权限与合法性检查:结合 TabletServiceImpl::Write() 已做的令牌校验,进一步在 Prepare() 中检查请求类型(INSERT/UPDATE/DELETE)与目标表的约束(如不可变列更新)。
    • 事务差异:非事务写不获取分区/事务锁;事务写(本文略)在此阶段可能受限仅允许 INSERT 类,并需额外锁定。
    • 行定址与列投影准备:为后续 ApplyRowOperations() 的批量存在性检查与列写入准备必要的 key probe 与列投影上下文,降低 Apply 阶段的重复计算开销。
  • 输出
    • prepare_state_ = PREPARED(由 OpDriver 设置)。
    • 行锁已持有;RowOp 已解码;错误(如 schema mismatch、行不存在/已存在)在 per-row 层面记录,或整体失败返回。
  • 与 OpDriver 的耦合:若此时复制尚未触发,Leader 在随后分配 ts 并 Start();若复制已完成(回调先到),OpDriver 会立即尝试进入 ApplyAsync()

Start(登记 MVCC 与时间戳)

  • 入口WriteOp::Start()write_op.cc)。Leader 在 OpDriver::Prepare() 完成解码与锁定后调用;Follower 在 OpDriver::Init() 早期调用(回放路径)。
  • 主要步骤
    • 时间戳设置:Leader 路径由 OpDriver::Prepare() 调用 time_manager()->AssignTimestamp() 后,将 ts 写入 ReplicateMsgWriteOpState;Follower 路径从回放的 ReplicateMsg 取 ts。
    • MVCC RESERVED 登记:Tablet::StartOp(state) -> MvccManager::StartOp(ts) 将本写操作标记为 RESERVED,进入 in-flight 集合。
  • 目的:确保在复制与准备并行的情况下,读的安全时间不会越过尚未提交的写;并为后续 APPLYING/APPLIED 状态转换做好铺垫(详见第 5 节)。

Apply(实际行写入与 CommitMsg 构建)

  • 入口WriteOp::Apply(CommitMsg* msg)write_op.cc)。由 OpDriver::ApplyTask() 调用。
  • 前置WriteOpState::StartApplying() -> MvccManager::StartApplyingOp(ts) 将状态从 RESERVED 转为 APPLYING
  • 主要步骤
    • 批量存在性检查:Tablet::ApplyRowOperations(state) 内部首先进行 BulkCheckPresence 对多行键进行存在性探测,减少逐行冲突代价。
    • 逐行应用:ApplyRowOperation(state, row_op)
      • Insert/Upsert:InsertOrUpsertUnlocked() 将数据写入当前 MemRowSet,必要时分配新 row id;UPSERT_IGNORE 在存在冲突时按忽略策略处理。
      • Mutate(Update/Delete):MutateRowUnlocked() 写入 DeltaMemStore 形成列增量或删除标记。
    • 结果与错误:每行生成 RowOpResultPB(成功与错误类型,如 ALREADY_PRESENTNOT_FOUNDMISMATCHED_SCHEMA 等),累计到 WriteResponsePBper_row_errors
    • CommitMsg 构建:填充 CommitMsg(含 commited_op_id、影响的 RowSets/DeltaStores 引用、时间戳等),供 WAL AsyncAppendCommit() 持久化。
  • 与外部一致性OpDriver::ApplyTask() 在非 COMMIT_WAIT 模式下可能提前推进 AdjustNewOpLowerBound(ts)COMMIT_WAIT 则在 WAL 追加后执行 CommitWait() 再响应。

Finish(完成或中止,释放与统计)

  • 入口WriteOp::Finish(Op::ApplyResult result)write_op.cc)。由 OpDriver::Finalize() 调用。
  • 主要步骤
    • 成功路径:FinishApplyingOrAbort(success=true) -> MvccManager::FinishApplyingOp(ts) 将写标记为 APPLIED(可见点);释放行锁、schema 锁;更新计数与 metrics(如写入行数、延迟统计)。
    • 失败路径:在复制失败、准备失败或应用失败场景下,调用 FinishApplyingOrAbort(success=false) -> AbortOp(ts),确保读视图不受污染,并释放所有持有资源。
    • 回调与清理:通过 completion_callbackWriteResponsePB 返回客户端;op_tracker_->Release(this) 结束对该 OpDriver 的跟踪。
  • 与 WAL/恢复:成功路径对应 WAL 已写入 CommitMsg;恢复时由 tablet_bootstrap.cc 的重放逻辑将该 ts 的操作重建为已应用状态。

错误处理与幂等性

  • Prepare 级错误:解码/权限/锁定失败,若尚未复制成功则安全失败返回;若复制已完成则触发 FATAL(避免一致性破坏)。
  • 复制失败:在 OpDriver::ReplicationFinished() 标记失败后,ApplyAsync() 路径统一 HandleFailure(op_status_) 并在 Finish()AbortOp(ts)
  • 应用失败:逐行错误通过 per_row_errors 返回;不可恢复错误导致整体 AbortOp(ts)
  • Follower 幂等响应:若需要返回客户端(重试附着),在 Init()/Prepare() 中注册 FollowerOpCompletionCallback;失败统一返回 ERROR_SERVER_TOO_BUSY 促使客户端重试附着到 Leader。

重要源码位置

  • src/kudu/tablet/ops/write_op.ccPrepare()Start()Apply()Finish()
  • src/kudu/tablet/tablet.ccDecodeWriteOperations()AcquireRowLocks()ApplyRowOperations()ApplyRowOperation()InsertOrUpsertUnlocked()MutateRowUnlocked()
  • src/kudu/tablet/ops/op_driver.cc:驱动 Prepare() / ApplyTask() / Finalize()、时间戳分配与外部一致性处理(AdjustNewOpLowerBoundCommitWait)。
  • src/kudu/tablet/mvcc.hStartOp()StartApplyingOp()FinishApplyingOp()AbortOp();读快照 MvccSnapshot

OpDriver 状态机

源码位置src/kudu/tablet/ops/op_driver.h / op_driver.cc

OpDriver 负责把一个逻辑写操作(WriteOp)从“构造”推进到“复制提交”“应用”“完成”,内部以两个正交状态变量跟踪进度:

状态变量

  • replication_state_(复制相关):
    • NOT_REPLICATING:尚未进入 Raft 复制(Leader 写在执行 Prepare 前的初始状态)。
    • REPLICATING:复制请求已发起(Leader 调用 RaftConsensus::Replicate() 后;Follower 在 Init() 中调用 Start() 后即设为 REPLICATING)。
    • REPLICATION_FAILED:复制阶段失败(发送或共识失败;Leader 在 Replicate 返回非 OK,或回调 ReplicationFinished() 中 status 非 OK)。
    • REPLICATED:共识层确认提交(本操作在多数副本持久化 ReplicateMsg,对应回调 ReplicationFinished() 成功路径)。
  • prepare_state_(准备相关):
    • NOT_PREPARED:尚未完成本地 Prepare(还没解码行/锁定)。
    • PREPAREDop_->Prepare() 成功执行(已完成 schema 解码、行锁获取、权限与合法性检查)。

两个状态必须同时满足:replication_state_ == REPLICATEDprepare_state_ == PREPARED 才能进入 Apply(ApplyAsync())。失败则走 Abort/HandleFailure。

主要方法与状态流转

以下描述 Leader 普通写操作(非事务)完整路径:

  1. 构造:TabletReplica::SubmitWrite() -> 创建 WriteOp + NewLeaderOpDriver() -> OpDriver::Init()
    • 检查 tablet 未停止。
    • 复制状态初始 NOT_REPLICATING,准备状态初始 NOT_PREPARED
    • 如果是 FOLLOWER 分支(重放),这里会直接:设置 op_id_copy_replication_state_ = REPLICATING、调用 op_->Start()(立即把 MVCC 置为 RESERVED),并在结果跟踪需要时设置 Follower 回调。
  2. 调度:ExecuteAsync()
    • Leader:调用 consensus_->CheckLeadershipAndBindTerm() 绑定当前 term。
    • 提交 PrepareTask() 到 prepare 线程池(串行 token 保证准备顺序)。
  3. 准备:PrepareTask()
    • Deadline 校验(准备队列等待超时直接 HandleFailure(Status::TimedOut))。
    • 调用 Prepare() 真正执行:
      • op_->Prepare() -> 解码请求、锁 row/schema、权限校验。
      • 设置 prepare_state_ = PREPARED(加锁保护)。
      • Follower:在此注册结果跟踪 RegisterFollowerOpOnResultTracker(),避免竞态。
      • Leader:检查自己仍是 Driver(去重重复请求)。
      • 分支:
        • 若复制尚未触发(replication_state_ == NOT_REPLICATING):
          • 分配时间戳:time_manager()->AssignTimestamp() -> 写入 replicate_msg。
          • 调用 op_->Start()(创建 MVCC ScopedOp,进入 RESERVED)。
          • 设置 replication_state_ = REPLICATING,调用 RaftConsensus::Replicate()
        • 若复制已完成(REPLICATEDREPLICATION_FAILED,可能回调先到):直接尝试 ApplyAsync()
  4. 复制完成:ReplicationFinished(status)(Raft 回调线程):
    • 赋值 op_id_copy_(共识生成的真实 OpId)。
    • status.ok()replication_state_ = REPLICATED;否则:replication_state_ = REPLICATION_FAILED 并调用 op_->AbortPrepare()(防止继续准备逻辑)。
    • 如果此时已 prepare_state_ == PREPARED -> ApplyAsync();否则等待准备线程设置 PREPARED 后进入 Apply。
  5. 应用调度:ApplyAsync()
    • 必须在锁下看到:prepare_state_ == PREPARED
    • 若复制失败:HandleFailure(op_status_) -> Abort。
    • 正常:验证顺序(order_verifier_->CheckApply());若外部一致性模式不是 COMMIT_WAIT,则提前推进 MVCC 新操作下界:AdjustNewOpLowerBound(timestamp)
    • ApplyTask() 提交到 apply 线程池。
  6. Apply 阶段:ApplyTask()
    • 再次校验 tablet 未停止。
    • 调用 op_->Apply(&commit_msg):内部执行 Tablet::ApplyRowOperations()
    • 设置 commit_msg.commited_op_id(拼写保留源码)为 op_id_copy_;写响应 timestamp。
    • 异步 WAL 追加:log_->AsyncAppendCommit()
    • 外部一致性模式为 COMMIT_WAIT 时执行 CommitWait():等待本地时钟前进到事务时间戳之后。
    • 调用 Finalize()
  7. Finalize:
    • op_->Finish(Op::APPLIED) -> MVCC FinishApplying,释放锁,更新 metrics。
    • 调用 completion callback 发送 RPC 响应。
    • op_tracker_->Release(this) 清除跟踪。

Follower 操作差异(重放):

  • Init() 即:replication_state_ = REPLICATING 并调用 op_->Start(),因为 replicate 消息已含 timestamp,保证进入 MVCC 早于 safe time 推进。
  • 准备与复制回调可能顺序互换;两者任一先到都只设置自己的状态,最后由后到达的一方检测到另一方已就绪再触发 ApplyAsync()
  • 结果跟踪:只有需要返回客户端(幂等重试)时注册 FollowerOpCompletionCallback,失败统一报告 TOO_BUSY 让客户端重试重新附着 Leader 正常路径。

回调注册与触发(Leader/Follower)

  • Leader 回调注册(复制完成):OpDriver::Init() 在创建共识轮次时注册复制完成回调,触发后进入 OpDriver::ReplicationFinished()
    • 源码:src/kudu/tablet/ops/op_driver.cc
      • mutable_state()->set_consensus_round(consensus_->NewRound(std::move(replicate_msg), [wp](const Status& s){ /* sp->ReplicationFinished(s) */ }));
  • Follower 回调注册(复制完成):TabletReplica::StartFollowerOp() 在 round 上设置回调,同样进入 ReplicationFinished()
    • 源码:src/kudu/tablet/tablet_replica.cc
      • state->consensus_round()->SetConsensusReplicatedCallback([wp](const Status& s){ /* sp->ReplicationFinished(s) */ });
  • 复制触发非阻塞:Leader 在 Prepare() 中调用 consensus_->Replicate(mutable_state()->consensus_round()) 发起复制;该调用不等待共识完成,失败则立即将 replication_state_ 置为 REPLICATION_FAILED 并返回错误;成功则待 Raft 层稍后通过上述回调通知 ReplicationFinished()
  • 结果跟踪(Follower):若需要幂等响应,Init() 中为 Follower 设置 FollowerOpCompletionCallback,失败统一返回 ERROR_SERVER_TOO_BUSY 促使客户端重试并附着到 Leader 执行路径。

状态机(线程并发视角)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
                 (prepare pool)                           (raft callback)                 (apply pool)
[Init] --ExecuteAsync--> [Prepare] --assign ts + Start + Replicate--> [RaftConsensus] --(ReplicationFinished)-->
   |                           |                         ^                       |
   |                           |                         |                       |
   |                   set PREPARED                      +-----------------------+
   |                           |
   +------ if PREPARED && REPLICATED -------------------------------> [ApplyAsync] --> [ApplyTask]
                                                                        |
                                                        AsyncAppendCommit + CommitWait?
                                                                        |
                                                                     [Finalize]

ReplicationFinished:
  ok   -> replication_state = REPLICATED
  fail -> replication_state = REPLICATION_FAILED -> HandleFailure -> Finish(Abort)

Apply gate:
  requires (prepare_state == PREPARED) && (replication_state == REPLICATED)

状态转移表(Leader 写)

事件前状态 (replication, prepare)后状态触发代码说明
Init(NOT_REPLICATING, NOT_PREPARED)同上Init()构造完成
Prepare 成功且未复制(NOT_REPLICATING, NOT_PREPARED)(REPLICATING, PREPARED)Prepare()分配 ts + Start + Replicate
Replicate 回调成功先于 Prepare(REPLICATING, NOT_PREPARED)(REPLICATED, NOT_PREPARED)ReplicationFinished()等待 Prepare 完成
Replicate 回调成功后 Prepare 完成(REPLICATED, PREPARED)(REPLICATED, PREPARED)Prepare()/ReplicationFinished()进入 ApplyAsync
Replicate 回调失败(REPLICATING, *)(REPLICATION_FAILED, *)ReplicationFinished()若已 PREPARED 后续 ApplyAsync 里走失败分支
Prepare 队列超时(NOT_REPLICATING, NOT_PREPARED)(REPLICATION_FAILED, NOT_PREPARED)PrepareTask()/HandleFailure()直接失败响应
ApplyAsync 启动(REPLICATED, PREPARED)(REPLICATED, PREPARED)ApplyAsync()提交 ApplyTask
ApplyTask 成功 + 非 COMMIT_WAIT(REPLICATED, PREPARED)(REPLICATED, PREPARED)ApplyTask()/Finalize()提前推进 MVCC 下界
ApplyTask 成功 + COMMIT_WAIT(REPLICATED, PREPARED)(REPLICATED, PREPARED)CommitWait()/Finalize()等待时钟后 Finalize
Finalize(REPLICATED, PREPARED)终结Finalize()完成回调释放资源

异常与失败处理

  • 复制失败:ReplicationFinished() 设置 REPLICATION_FAILED,若已准备则 ApplyAsync() 中走 HandleFailure(op_status_) -> abort。
  • 准备阶段失败(解码/锁/权限):Prepare() 返回非 OK -> HandleFailure()
    • 若已复制(REPLICATING/REPLICATED)且 tablet 未停止 -> FATAL(保证一致性)。
    • 否则:标记 ABORTED,回调错误,释放 op。
  • 超时:在排队准备超过 deadline -> 统计 ops_timed_out_in_prepare_queueHandleFailure(Status::TimedOut)
  • Tablet 停止:ApplyTask()Init() 检查到 tablet->HasBeenStopped() -> HandleFailure(IllegalState)
  • COMMIT_WAIT 等待失败(时钟问题)当前实现 CHECK_OK(CommitWait()),失败视为崩溃(防止不一致)。

外部一致性模式影响

  • 非 COMMIT_WAIT:复制完成后即可推进 AdjustNewOpLowerBound(timestamp),增强后续读的可见范围。
  • COMMIT_WAIT:在 WAL commit 之后等待本地时钟单调前进到提交时间戳之后再回复客户端(保证线性化顺序)。

并发与顺序保证

  • Prepare 队列串行 token:确保同一 tablet 的准备顺序与 MVCC timestamp 分配顺序一致(避免交叉写入破坏逻辑时序)。
  • order_verifier_->CheckApply():在 Apply 前核对提交顺序(OpId index 与准备物理时间戳)避免乱序应用。
  • 双状态竞态解决:复制回调与准备线程各自只写自己相关状态,在任一完成后检查另一标志是否就绪,若都就绪立刻触发 Apply。

调试与观测

  • 状态字符串:StateString() 输出简写:NR|R|RF|RD + NP|P(例如 R-NP, RD-P)。
  • Metrics:
    • replication_duration(复制耗时)在 ReplicationFinished() 增量。
    • ops_timed_out_in_prepare_queue 在超时路径增量。
    • commit_wait_duration_usec 在 COMMIT_WAIT 模式等待结束后记录。
  • Trace:TRACE_EVENT_FLOW_BEGIN/END 包裹 ExecuteAsync/PrepareTask/ApplyTask 便于 Chrome tracing 分析链路。

Follower 特殊点小结

  • Timestamp 已在 replicate 消息中;不再调用 AssignTimestamp()
  • op_->Start() 在 Init 时立即执行,保证其进入 MVCC 保留区防止 safe time 提前。
  • 失败统一使用 FOLLOWER 回调给客户端(若跟踪)返回 TOO_BUSY 促使重试走 Leader。

综上,OpDriver 通过两个独立状态使复制与准备并行化,减少等待,同时确保只有在“本地准备完成 + 共识提交”后才进入不可回滚的 Apply 阶段,从而实现写操作的高并发与严格一致性。

MVCC 管理(与写流程的关键耦合)

核心文件/类型src/kudu/tablet/mvcc.h: class MvccManagersrc/kudu/tablet/tablet.ccsrc/kudu/tablet/ops/op_driver.cc

本节聚焦 MVCC 在“非事务写流程”的每个阶段中的作用与状态演进,并说明它如何与外部一致性(COMMIT_WAIT/非 COMMIT_WAIT)、WAL、读快照共同协作,保证线性化或快照一致的可见性。

MVCC 状态与时间戳

  • 时间戳分配(Leader):在 OpDriver::Prepare() 中,Leader 通过 time_manager()->AssignTimestamp() 为本次写分配单调递增的 MVCC 时间戳,并写入 ReplicateMsg;随后调用 WriteOp::Start() -> Tablet::StartOp() -> MvccManager::StartOp(ts),将该操作置为 RESERVED(在-flight)。Follower 路径直接从回放的 ReplicateMsg 中读取时间戳,并在 OpDriver::Init() 早期调用 op_->Start() 完成 RESERVED 登记。
  • 状态序列:MVCC 为一次写操作维护以下典型状态:RESERVED(占位,尚未可见)→ APPLYING(正在应用)→ APPLIED(已应用,成为可见的一致性点)或 ABORTED(失败终止)。
  • 进入 APPLYING:在 ApplyAsync() 触发后,WriteOpState::StartApplying() 调用 MvccManager::StartApplyingOp(ts),把该写从 RESERVED 转为 APPLYING,这保证在并发读写下,读不会提前看到半成品。
  • 完成/中止WriteOp::Finish() 内部经由 WriteOpState::FinishApplyingOrAbort()MvccManager::FinishApplyingOp(ts)(成功)或 AbortOp(ts)(失败)。成功路径将该时间戳推进为正式可见的提交点;失败路径则让读视图忽略该操作。

源码指引:

  • mvcc.hStartOp()StartApplyingOp()FinishApplyingOp()AbortOp() 以及快照 MvccSnapshot 的定义。
  • tablet.ccTablet::StartOp()Tablet::ApplyRowOperations() 中进入/退出 APPLYING 的时机。
  • op_driver.ccPrepare() 分配时间戳与调用 op_->Start()ApplyAsync() 前后的 StartApplying() / FinishApplyingOrAbort()

与 OpDriver 阶段的精确耦合

  • Prepare 阶段:Leader 分配时间戳并登记 RESERVED;Follower 使用回放时间戳并在 Init() 中即刻登记 RESERVED。此举确保在复制与准备并行的竞态下,写操作已经被 MVCC 记录为“在-flight”,读在安全时间推进时不会越过它。
  • Replicate/Commit(共识提交)后:当 ReplicationFinished() 置为 REPLICATEDprepare_state_ == PREPARED,进入 ApplyAsync()。在 ApplyTask() 内部构建 CommitMsg 并异步 WAL 追加后,若外部一致性为 COMMIT_WAIT,执行 CommitWait() 等本地时钟前进到该时间戳之后;随后 Finalize() 中将 MVCC 置为 APPLIED
  • 非 COMMIT_WAIT 模式的下界推进:Leader 在 ApplyAsync() 前调用 tablet_->mvcc_manager()->AdjustNewOpLowerBound(ts)(参见 op_driver.cc),提前推进“新操作的可见下界”;这使得后续读在安全时间判断上更早“看见”该写,从而降低尾延迟。若启用 COMMIT_WAIT,则改由 CommitWait() 保证线性化,AdjustNewOpLowerBound 不提前。

读快照与可见性

  • MvccSnapshot:读路径构造 MvccSnapshot(snapshot_ts),仅返回 < snapshot_ts 且状态为 APPLIED 的写入结果;对 RESERVED/APPLYINGABORTED 的操作不可见。
  • 安全时间(safe time)与读:Leader 维护安全时间推进,确保读不会越过尚未 APPLIED 的写。Follower 在 Init() 先行 RESERVED 登记的设计,防止其安全时间推进导致读“跳过”该写(避免可见性反常)。
  • 外部一致性影响:在 COMMIT_WAIT 模式下,RPC 响应在 CommitWait() 之后返回,确保客户端在读回本写时已满足线性化顺序;非 COMMIT_WAIT 下,响应可能早于本地时钟完全推进,但通过 MVCC 下界/安全时间控制,外部读取仍遵循快照一致。

失败/中止与一致性

  • Prepare 失败:若在 Prepare() 解码/锁/权限失败,在未复制成功情况下,直接 AbortOp(ts)(或未登记则无需 Abort);若复制已成功(不应发生正常路径),系统会以 FATAL 保障一致性,避免出现“复制提交但本地未应用”的状态。
  • 复制失败ReplicationFinished()REPLICATION_FAILEDApplyAsync()HandleFailure(),随后 FinishApplyingOrAbort() 选择 AbortOp(ts)。该写不会进入可见集,读视图不受污染。
  • Apply 失败:在 Apply() 阶段遇到不可恢复错误,最终也会经 FinishApplyingOrAbort()AbortOp(ts),保持读视图纯净。

与 WAL/恢复的协作

  • 持久化顺序:WAL 先记录 ReplicateMsg(含 ts),后记录 CommitMsg(写入位置与结果)。MVCC 的 APPLIED 对应已经写入内存结构并在 WAL 有相应 commit 记录。
  • 恢复(Bootstrap):在 tablet_bootstrap.cc 重放时,先根据 ReplicateMsg 重建 in-flight 集合(这些操作初始等价于 RESERVED),再根据 CommitMsg 逐步将其应用与完成(相当于进入 APPLYINGAPPLIED)。缺失 CommitMsg 的 in-flight 操作会被视为未完成,最终在恢复结束时按 AbortOp(ts) 处理,确保一致性。

代码位置(快速索引)

  • src/kudu/tablet/mvcc.hStartOp()StartApplyingOp()FinishApplyingOp()AbortOp()AdjustNewOpLowerBound()MvccSnapshot
  • src/kudu/tablet/ops/op_driver.ccPrepare()(Leader 赋 ts)、ApplyAsync()(非 COMMIT_WAIT 下界推进)、ApplyTask()(构建 commit 与 WAL 追加)、Finalize()(FinishApplying)。
  • src/kudu/tablet/tablet.ccTablet::StartOp()Tablet::ApplyRowOperations()WriteOpState::StartApplying() / FinishApplyingOrAbort() 调用栈。
  • src/kudu/tablet/tablet_bootstrap.cc:恢复流程对 replicate/commit 的重放与 MVCC 状态重建。

综上,MVCC 贯穿非事务写的全链路:在 Prepare/Replicate 期间以 RESERVED 防止读越界,在 Apply 前转入 APPLYING 隔离半成品,在 Finalize 时标记为 APPLIEDABORTED 决定可见性;与外部一致性模式、WAL 提交顺序、安全时间推进共同确保 Kudu 的强一致写入与可预测读视图。

状态流

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
time --->

StartOp(ts):  RESERVED  ----->  StartApplyingOp(ts):  APPLYING  ----->  FinishApplyingOp(ts):  APPLIED
                                 |                                           ^
                                 |                                           |
                                 +---- on failure: AbortOp(ts) ---------------+

外部一致性:
- 非 COMMIT_WAIT:ApplyAsync 前可调用 AdjustNewOpLowerBound(ts) 提前推进下界。
- COMMIT_WAIT:WAL 追加后执行 CommitWait(ts) 再响应,保证线性化。

锁与并发控制

组件:

  • Schema 锁:Tablet::DecodeWriteOperations()AcquireSchemaLock(),防止并发 schema 变更影响解码与应用。
  • 行锁:Tablet::AcquireRowLocks() -> 为每行 key 构造 RowSetKeyProbe 后批量申请 ScopedRowLock(排他)。
  • 分区锁 / 事务锁:仅事务写使用;非事务写忽略或按 flag 控制。
  • MVCC 保证提交顺序与可见性;WAL Commit 序列化恢复信息。

Schema 快照与解码(写路径的入口准备)

核心文件/方法src/kudu/tablet/tablet.cc: Tablet::DecodeWriteOperations()src/kudu/common/schema.hsrc/kudu/common/wire_protocol.proto: RowOperationsPBsrc/kudu/tablet/tablet.cc: AcquireSchemaLock()

本节系统化梳理“Schema 快照与解码”的具体流程,它是第 3.1 节 Prepare 的首要工作,对第 8 节的行级应用与错误映射直接提供输入,且在并发 Alter Table 时保证写路径的结构稳定。

为什么需要 Schema 快照

  • 并发安全:通过 AcquireSchemaLock() 获取读锁,冻结当前 Schema 视图,避免写在解码与应用过程中被并发的 schema 变更破坏(列增删、类型变更等)。
  • 一致性语义:同一批 RowOp 在 Prepare/Apply 期间使用同一个 schema 快照;读路径通过 MVCC 快照与 schema 版本配合,保证结果的可解释性。
  • 性能考量:快照后的列投影与类型映射可以被重用,减少 Apply 阶段的重复计算与分支判断。

解码管线(RowOperationsPB → RowOp)

  • 入口Tablet::DecodeWriteOperations(state)
  • 步骤
    • 读取 WriteRequestPB.row_operations 中的 RowOperationsPB,根据操作类型(INSERT/UPSERT/UPDATE/DELETE/及 IGNORE 变体)逐一解析;
    • 将客户端列编码映射到内部列 ID 与类型(Schema 提供列查找与类型校验),对缺失必填列、未知列、类型不匹配进行早期拦截;
    • 构造内部 RowOp:包含操作类型、主键投影(用于后续行锁与路由)、列值投影(用于插入或增量更新),并绑定到 WriteOpStaterow_ops 容器;
    • 捕获本次解码使用的 Schema 快照并保存在 WriteOpState,供 Apply 阶段的行定址与列写入使用。
  • 错误映射
    • 结构性错误:MISMATCHED_SCHEMA(类型错误/列不存在/缺列)、IMMUTABLE_COLUMN(不可变列写入)等在解码时直接判定;
    • 行级错误:主键缺失或非法在此阶段即可形成 per-row 错误占位,或推迟到存在性检查阶段完善(见第 8 节)。

主键与列投影

  • 主键处理:基于 Schema 的主键列生成用于锁与路由的 key 投影(配合第 8 节的 BulkCheckPresenceRowSetKeyProbe)。
  • 列投影:为 INSERT/UPSERT 构造完整或部分列写入投影;为 UPDATE/DELETE 构造变更列投影与删除标记;
  • 默认值/空值:按 Schema 默认值与列约束补全缺失列,或记录错误返回(取决于表的约束与操作类型)。

与 Prepare/Apply/MVCC 的关系

  • 与 Prepare(第 3.1 节):解码与行锁获取是 Prepare 的核心;解码产生的 RowOp、主键/列投影会直接进入行锁与后续存在性检查;
  • 与 Apply(第 8 节)ApplyRowOperations() 依赖解码出的 RowOp 与列投影进行批量存在性检查与分派到 InsertOrUpsertUnlocked/MutateRowUnlocked
  • 与 MVCC(第 5 节):解码不直接改变 MVCC,但其产出的批次与行集合在 StartApplying()FinishApplying 所覆盖的“可见性窗口”内一致生效。

典型错误与返回路径

  • MISMATCHED_SCHEMA:客户端提供的列与当前快照不一致(列名/类型);
  • IMMUTABLE_COLUMN:尝试更新不可变列;
  • ALREADY_PRESENT / NOT_FOUND:通常在 Apply 阶段的存在性检查与实际写入中形成,但部分前置条件(主键为空等)可在解码时直接标记;
  • 所有 per-row 错误最终汇入 WriteResponsePB.per_row_errors,整体失败则由 OpDriver::HandleFailure() 统一处理。

代码位置(快速索引)

  • src/kudu/tablet/tablet.ccAcquireSchemaLock()DecodeWriteOperations()
  • src/kudu/common/schema.h/.ccSchema 定义、列查找与类型校验、默认值处理;
  • src/kudu/common/wire_protocol.protoRowOperationsPB(客户端传输格式);
  • 关联:第 3.1 节 Prepare、第 8 节 Row 应用逻辑、第 5 节 MVCC 管理。

总结:Schema 快照与解码是写路径的“入口准备”,它将不稳定的客户端行操作转换为受当前 schema 约束的内部 RowOp 序列,并与行锁、存在性检查、列写入、per-row 错误生成紧密衔接,是保证并发安全与可读可恢复性的关键步骤。

Row 应用逻辑(解码后的逐行执行与结果生成)

核心文件/方法src/kudu/tablet/tablet.ccTablet::ApplyRowOperations()ApplyRowOperation()InsertOrUpsertUnlocked()MutateRowUnlocked(),以及 BulkCheckPresence()

本节承接第 3 节(Prepare 产出 RowOp + 行锁)与第 5 节(进入 APPLYING 状态),具体说明一次写如何将解码后的每条 RowOp 应用到内存结构、生成 per-row 结果,并为 WAL 的 CommitMsg 填充必要信息。

应用阶段总览

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
StartApplying(ts)
  |
BulkCheckPresence(keys)
  |
for each RowOp:
  +-- INSERT/UPSERT --> InsertOrUpsertUnlocked (MemRowSet)
  |
  +-- UPDATE/DELETE --> MutateRowUnlocked (DeltaMemStore)
  |
  produce RowOpResultPB (per-row errors/success)
  |
Build CommitMsg -> WAL AsyncAppendCommit -> (optional) CommitWait -> Finalize

数据流总览(Apply 阶段内部)

  1. 进入 Apply() 前,WriteOpState::StartApplying() 已切换 MVCC 状态至 APPLYING(见第 5 节)。
  2. Tablet::ApplyRowOperations(state)
  • 预扫描:对同批次的 row keys 执行 BulkCheckPresence(state),统一查询当前 RowSetTree(覆盖内存与磁盘 rowset)来判断是否“存在”。
  • 遍历 state->row_ops():逐条调用 ApplyRowOperation(state, row_op),依据类型(INSERT/UPSERT/UPDATE/DELETE/其 IGNORE 变体)分派到具体实现。
  • 聚合 per-row 结果、累加行数计数器,并填入 TxResultPB 以便 CommitMsg 写入 WAL。

批量存在性检查 BulkCheckPresence

  • 目的:避免每条 row 重复进行昂贵的 rowset 路由与存在性探测,降低锁持有时间与索引/布隆过滤器查询开销。
  • 实现思路(概念):
    • 将待操作的主键集合批量投影成 RowSetKeyProbe,在 RowSetTree 中并行/批量地判断是否已存在;
    • 将“已存在/不存在”标记回写到对应的 RowOp 上,供后续分派快速分支。
  • 连贯性:与第 3.1 节 Prepare 中的“行锁已经获取”配合,保证在 Apply 时该批次上的存在性判断与实际写入不会被其他写打破不变量。

Insert / Upsert 路径

  • 分派条件:
    • INSERT:要求 BulkCheckPresence 标记为“不存在”,否则产生 per-row 错误 ALREADY_PRESENT
    • UPSERT:若不存在则走插入;若存在则转入“部分列更新”的 Upsert 逻辑;
    • INSERT_IGNORE / UPSERT_IGNORE:在存在冲突时按“忽略”策略不报错(计入影响行数/结果标记)。
  • 执行:InsertOrUpsertUnlocked(state, row_op)(在已持有行锁与 schema 快照前提下)
    • 定位目标 MemRowSet 并分配新 row id(插入场景);
    • 将行构造为 base row 追加到 MemRowSet
    • 对于 UPSERT 存在场景,仅对指定列做“若存在则更新”的轻量变更(实现可能复用 mutate 写入 delta)。
  • 结果:
    • RowOpResultPB 中记录 INSERT/UPSERT 的结果状态和(可选)新行位置/版本信息;
    • 计数器:累计 num_inserts/num_upserts 等,用于响应与 metrics。

Update / Delete 路径(增量写入 DeltaMemStore)

  • 分派条件:
    • UPDATE:要求 BulkCheckPresence 标记为“存在”,否则 per-row 错误 NOT_FOUND
    • DELETE:同样要求存在。
  • 执行:MutateRowUnlocked(state, row_op)
    • 在目标 rowset 中根据 key/row id 定位老版本;
    • 将列变更或删除标记写入 DeltaMemStore(Redo delta),保持 base 行不变;
    • 按 schema 约束拒绝不合法的列更新(如不可变列),映射成 per-row 错误类型(IMMUTABLE_COLUMN/MISMATCHED_SCHEMA 等)。
  • 结果:
    • RowOpResultPB 记录 UPDATE/DELETE 的成功与否;
    • 计数器:累计 num_updates/num_deletes
    • 后续由 compaction 将 base 与 delta 归并,读路径通过 MvccSnapshot 与列读取器按需“合成”可见版本。

per-row 结果与错误映射

  • 每条 RowOp 都对应一个 RowOpResultPB:成功/失败、失败原因(ALREADY_PRESENTNOT_FOUNDMISMATCHED_SCHEMAIMMUTABLE_COLUMN 等),这些被回填到 WriteResponsePB.per_row_errors
  • IGNORE 变体不将“可忽略冲突”计为错误,但会反映在影响行数计数中,以便客户端理解“部分未变更”。
  • 文档连贯:与第 3.1 节 Prepare(权限/合法性初筛)呼应——一些结构性错误在 Prepare 即被拒绝;行级冲突更多发生在 Apply 的存在性与实际写入阶段。

与 MVCC、WAL、OpDriver 的衔接

  • MVCC:Apply 期间处于 APPLYING;结束时由 WriteOp::Finish() 决定 APPLIED/ABORTED(见第 5 节)。
  • WAL:ApplyTask() 构造/填充 CommitMsg(包含 commited_op_id、时间戳、TxResultPB 摘要等),log_->AsyncAppendCommit() 异步持久化,支持恢复时的重放(见第 9 节)。
  • OpDriver:在非 COMMIT_WAIT 模式下于 ApplyAsync() 前可能调用 AdjustNewOpLowerBound(ts) 提前放宽读的下界;COMMIT_WAIT 模式则在 WAL 追加后 CommitWait() 再响应(见第 4.5 节)。

性能与并发注意点

  • BulkCheckPresence 减少重复索引/布隆过滤器命中,提升批量写的吞吐;
  • 行锁在 Prepare 阶段即获取,Apply 尽量短路径执行,降低锁持有时间;
  • Upsert 合并路径避免“先查再写”的重复代价,通过存在性标记直接分派;
  • DeltaMemStore 的增量写入使 UPDATE/DELETE 具备更好的随机写特性,后续由 compaction 吸收成本。

WAL 与恢复

写入路径构建 ReplicateMsg(含原始请求、timestamp),Apply 阶段生成 CommitMsg(含操作影响的内存结构位置、TxResultPB)。WAL 持久化顺序:Replicate -> Commit,恢复时:

  1. 重放 Replicate 生成 in-flight 操作集合。
  2. 根据 Commit 信息重建插入/变更所处的 RowSets / DeltaStores。

相关:TabletBootstrap 中对历史操作的重放逻辑(非事务路径在 bootstrap 恢复行的应用)。

非事务 vs 事务写差异(简述)

  • 非事务:TabletServiceImpl::Write() 直接 SubmitWrite();Prepare 允许多种 RowOperations;行锁后立即复制。提交即可见。
  • 事务:需要 txn_id。调度器 TxnOpDispatcher 控制并发;只允许 INSERT 类;需要额外的 partition lock;最终通过事务协调 BEGIN_COMMIT / FINALIZE_COMMIT 序列将 uncommitted rowsets 转换到 committed。本文不深入(后续可单独分析)。

错误与拒绝路径

  • 权限不足:NOT_AUTHORIZED。
  • Schema 不匹配 / 解码失败:MISMATCHED_SCHEMA。
  • 行冲突:ALREADY_PRESENT / NOT_FOUND(行级错误在 per_row_errors 中返回)。
  • 队列过载 / 内存压力:THROTTLED。
  • 共识领导性检查失败:客户端重试。
  • Prepare 队列超时:ops_timed_out_in_prepare_queue 计数 + 超时返回。

小结

非事务写路径由 TabletServiceImpl 入口到 OpDriver 状态机驱动,分离 Prepare/Replicate/Apply/Finish,借助 MVCC + 行锁保证并发与可见性;通过 WAL + CommitMsg 保障数据落盘与恢复。事务特性在非事务路径中被绕过,确保单次写延迟最小化与实现简洁。

关键源码索引

阶段入口/实现说明
RPCtablet_service.cc:TabletServiceImpl::Write请求校验/构造 WriteOpState
提交tablet_replica.cc:TabletReplica::SubmitWrite创建 Leader OpDriver
Preparewrite_op.cc:WriteOp::Prepare / tablet.cc:DecodeWriteOperations解码、锁、权限
RowLockstablet.cc:AcquireRowLocks / write_op.cc:AcquireRowLocks行粒度排他锁
Timestampop_driver.cc:Preparetime_manager()->AssignTimestamp分配 MVCC 时间戳
Replicateop_driver.cc:PrepareRaftConsensus::ReplicateRaft 复制开始
ReplicationFinishedop_driver.cc:ReplicationFinished进入 Apply 或 Abort
Applywrite_op.cc:WriteOp::Apply / tablet.cc:ApplyRowOperations实际内存写入
CommitRaftConsensus::Commit + WALWAL 持久化 commit 元数据
Finishwrite_op.cc:WriteOp::FinishMVCC 可见 + metrics
MVCCmvcc.h / Tablet::StartOp / FinishApplyingOrAbort状态转换

关键实现概要

1
2
virtual void Write(const WriteRequestPB* req, WriteResponsePB* resp,
                   rpc::RpcContext* context) OVERRIDE;

PBs

1
2
3
4
5
6
7
message WriteRequestPB {
  required bytes tablet_id = 1;
  optional SchemaPB schema = 2;      // 可能过期,但会自动映射
  optional RowOperationsPB row_operations = 3;   // insert/update/delete
  optional ExternalConsistencyMode external_consistency_mode = 4 [default = CLIENT_PROPAGATED];
  optional fixed64 propagated_timestamp = 5;
}
1
2
3
4
5
6
7
8
message RowOperationsPB {
  // 1 bit: operation type
  // n bits: column isset bitmap   (n is column count)
  // n bits: null bitmap           (n is column count)
  // variable: column data, For each column which is set and not NULL
  optional bytes rows = 2 [(kudu.REDACT) = true];
  optional bytes indirect_data = 3 [(kudu.REDACT) = true];
}

关键流程

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
void TabletServiceImpl::Write(const WriteRequestPB* req,
                              WriteResponsePB* resp,
                              RpcContext* context)
  得到TabletReplica或responseLookupRunningTabletReplicaOrRespond
  ACL,若 FLAGS_tserver_enforce_access_control
    VerifyAuthzTokenOrRespond
    CheckMatchingTableIdOrRespond
    得到WritePrivilegesWriteAuthorizationContext
  得到TabletGetTabletRef
  判断是否触发流控,tablet->ShouldThrottleAllow(bytes)
  判断是否触发内存压力,process_memory::SoftLimitExceeded
  打印INFO/WARNING日志, 拒绝请求
  判断server是否支持req指定的外部一致性模型server_->clock()->SupportsExternalConsistencyMode
  判断apply线程是否过载server_->tablet_apply_pool()->QueueOverloaded
  创建写op txnWriteOpState
  判断client请求是否发送了timestamp:
    - : server_->clock()->Update(ts)
  设置op完成回调
    - 非多行事务:
      回调:RpcOpCompletionCallback
      提交到线程池:replica->SubmitWrite      // 见详情(1
    - 多行事务:
      回调:TxnWriteCompletionCallback
      提交到线程池:replica->SubmitTxnWrite   // 见详情

1
Status TabletReplica::SubmitWrite(unique_ptr<WriteOpState> op_state)
  判断tablet replica是否正在运行CheckRunning()
  设置trackerstate->SetResultTracker(result_tracker_)
  新建WriteOp, LEADER模式
  新建OpDriverNewLeaderOpDriver
  异步写,driver->ExecuteAsync()       // 见详情(3

3
void OpDriver::ExecuteAsync()
  是否NOT_REPLICATING(是则表明自己是leader
    - : consensus_->CheckLeadershipAndBindTerm
  /// Prepare ///
  PrepareTask提交到prepare线程池prepare_pool_token_->Submit(... PrepareTask())  // 详情(4

4
void OpDriver::PrepareTask()
  Prepare()
    (Op,如WriteOp)op_->Prepare()  // 详情(5
    更新状态,PREPARED
    根据角色:
    - 如果是FOLLOWER:
      RegisterFollowerOpOnResultTracker
    - LEADER:
      判断该request id是否已经记录,如果是则放弃
    根据复制状态,ReplicationState:
    - NOT_REPLICATING:
      transaction分配时间戳consensus_->time_manager()->AssignTimestamp
      开始transactiontransaction_->Start
      状态 -> REPLICATING
      开始复制,consensus_->Replicate
        确保当前线程可以wait/blockThreadRestrictions::AssertWaitAllowed()
        确保自身状态和msg是可以复制的CheckSafeToReplicateUnlocked
        确保term的正确性round->CheckBoundTerm(CurrentTermUnlocked())
        添加新的round到队列AppendNewRoundToQueueUnlocked
        通知peerpeer_manager_->SignalRequest()
        // 回调: ReplicationFinished()
    - REPLICATING:
      do nothing
    - REPLICATION_FAILED:
    - REPLICATED:
      ApplyTask提交到apply线程池ApplyAsync
        确保op idx是逐个递增的, timestamp是升序的order_verifier_->CheckApply(...)
        调整时间戳,mvcc_manager()->AdjustSafeTime(...)
        /// Apply ///
        提交到apply线程池apply_pool_->Submit([this]() { this->ApplyTask(); })  // 详情(10

5
Status WriteOp::Prepare()
  解析到client_schemaSchemaFromPB
  获取到多行事务?,tablet->AcquireTxnLock(txn_id...)
  解析client请求的opstablet->DecodeWriteOperations
    获取schema锁tx_state->AcquireSchemaLock
    构造decoderRowOperationsPBDecoder
    解码数据,dec.DecodeOperations(ops)  // 写操作时:WRITE_OPS
      构造client-server的schema映射
        构造:ClientServerMapping
        映射:client_schema_->GetProjectionMapping
        验证:mapping.CheckAllRequiredColumnsPresent
      构造一行由默认值组成的数据,SetupPrototypeRowprototype_row_storage
      遍历待处理buffer,若还有数据,则解析这一行数据
        解码typeINSERTUPDATEDELETEUPSERT*_IGNORESPLIT_ROW*_BOUND
        1.根据type解码数据DecodeOp<WRITE_OPS>
          // INSERT, INSERT_IGNORE, UPSERT
          - RowOperationsPBDecoder::DecodeInsertOrUpsert  // (6)
          // UPDATE, UPDATE_IGNORE, DELETE, DELETE_IGNORE
          - RowOperationsPBDecoder::DecodeUpdateOrDelete  // (7)
        =========
        2.根据type解码数据DecodeOp<SPLIT_ROWS>
          - RowOperationsPBDecoder::DecodeSplitRow  // 8
            读取bitmap(isset + null)
            对每一列数据: 根据client请求的数据:
              写到op的split_row上
    // 多行txn校验
    如果是多行txn
    遍历每一行op,如果不是INSERT或INSERT_IGNORE,则报错
    // 权限判断
    无权限,则报错,authz_context->CheckPrivileges()
    // 获取锁,先partition锁,再行锁
    1.partition锁tablet->AcquirePartitionLock(state())
      op_state->AcquirePartitionLock
        得到TxnIdrequest()->txn_id()
        获取partition锁ScopedPartitionLoc
        - TRY_LOCK: manager_->TryAcquirePartitionLock
        - WAIT_FOR_LOCK: manager_->WaitUntilAcquiredPartitionLock
          如果未获取到,则报错
    2.行锁,tablet->AcquireRowLocks(state())
      遍历对每一行
        1.验证是否合法,或标记失败,ValidateOpOrMarkFailed
          已有结果的,直接返回,op->validop->has_result
          验证opValidateOp,不ok的标记
            - INSERTINSERT_IGNOREUPSERT
              ValidateInsertOrUpsertUnlocked
                验证enc_key_size
            - UPDATEUPDATE_IGNOREDELETEDELETE_IGNORE:
              ValidateMutateUnlocked
                不能是reinsert,其他ok
          否则ok
        2.验证属于tabletCheckRowInTablet
          partition_schema.PartitionContainsRow
      获取多行的锁,op_state->AcquireRowLocks
        解析到op的op->key_probe
        再由此获取多行锁,ScopedRowLock

6
Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row_storage,
                                                    const ClientServerMapping& mapping,
                                                    DecodedRowOperation* op)
  读取bitmap(isset + nullable)
  分配空间(row + isset bitmap)
  使用「默认值行数据」拷贝至本行,memcpyprototype_row_storage
  构造ContiguousRow对象
  对每一列数据,根据client请求的数据:
    set时
      col既不是nullable,也没有default时,非法
    set时
      解析是否设置col为nullclient_set_to_null
      - 如果是:
        - col是nullable的,则设置为null
        - 否则,报错
      - 否则:
        解析并拷贝值,ReadColumn
          读取col的sliceGetColumnSlice
            col的类型
            - BINARY
              indirect_data构造数据
            - other
              直接转换
          col的类型
          - BINARY
            memcpy
          - other
            slice.relocate  // 也是memcpy

7
Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
                                                    DecodedRowOperation* op)
  读取bitmap(isset + nullable)
  分配空间(key_row)
  构造ContiguousRow对象
  对每一列主键:
    检查column index没有变化
    检查isset必须为真
    检查null必须为假
    解析并拷贝值,ReadColumn
  得到一行数据
  - UPDATE/UPDATE_IGNORE
    构造change list: RowChangeListEncoder
    对每一非主键列:
      检查isset bitmap: 没有set则调过,否则:
        解析是否设置col为nullclient_set_to_null
        - col不是nullable的,报错
        - 否则
          解析并拷贝值,ReadColumn
        添加col的变更到rclrcl_encoder.AddColumnUpdate
    检查change list是否为空: 为空则返回
    分配空间(rcl), 并写到op的changelist
  - DELETE/DELETE_IGNORE
    对每一非主键列:
      检查isset bitmap: set则报错
      写到op的changelist

8
Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping,
                                              DecodedRowOperation* op)


10
void OpDriver::ApplyTask()
  判断tablet的运行状态, tablet->HasBeenStopped
  apply到内存(OpWriteOp)op_->Apply  // 详情(9
  得到CommitMsg, 设置op id
  设置response的时间戳
  apply到WALlog_->AsyncAppendCommit
    Log::AsyncAppend
      entry_batch_queue_.BlockingPut(...)
  若外部一致性模式是 COMMIT_WAIT
    CommitWait
  Finalize

9
Status WriteOp::Apply(CommitMsg** commit_msg)
  将操作apply到tablet上tablet->ApplyRowOperations
    StartApplying(tx_state)
      主要是记录mvcc时间戳tx_state->StartApplying()
      设置M/DRStx_state->set_tablet_components(components_)
    批量检测存在性,BulkCheckPresence  // ???
      构造每行数据的key及其indexkeys_and_indexes
      key数据排序std::stable_sort
      去重(因为第一个操作可能改变这个key的存在性)
      (TabletComponents*)comps->rowsets->ForEachRowSetContainingKeys, 检查key的存在性
        ProcessPendingGroup辅助方法,在一个rowset中批量找一批key的存在性
      更新到op_state中每个op所在的rs
    对每行数据:
      apply这行数据ApplyRowOperation
        检查这行opValidateOpOrMarkFailed
        检查tablet状态正常CheckHasNotBeenStopped
        对于没有判断存在性的key:          // 在本次批量中,跟其他op有重复的情况
          获取key所在的rowset集合FindRowSetsToCheck
          检查存在性
            遍历这些rowset: rowset->CheckRowPresent
        对不同类型的op:
        - INSERT/INSERT_IGNORE/UPSERT:
          插入opInsertOrUpsertUnlocked  // 11
        - UPDATE/UPDATE_IGNORE/DELETE/DELETE_IGNORE:
          更新这行数据,MutateRowUnlocked  // 12
    更新metricsmetrics_->AddProbeStats
  对每行操作结果: 记录到结果, 更新metricsUpdatePerRowMetricsAndErrors
  构造并填充回传参数,CommitMsg

11
Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context,
                                      WriteOpState *op_state,
                                      RowOp* op,
                                      ProbeStats* stats)
  - 若该key已存在:
    - UPSERT:
      ApplyUpsertAsUpdate
        遍历schema的每列, 根据其构造为RowChangeListEncoder
        将这个updates操作apply到DRSrowset->MutateRow
        更新metrics
    - INSERT_IGNORE:
      返回
    - INSERT
      返回错误
  - key不存在:
    - 如果是多行事务
      判断是否在MRS中comps->memrowset->CheckRowPresent
        若在,则返回失败(INSERT_IGNORE时会容错
      插入到txn mrs中txn_rowsets->memrowset->Insert
      返回
    - 非多行事务
      尝试insert到MRS中comps->memrowset->Insert
        编码主键,schema_.EncodeComparableKey
        mutation.Prepare
        - 若已存在:
          - 若已delete:
              Reinsert
          - 否则报错
        - 若不存在:
            生成并拷贝row数据
            mutation.Insert
    - 写入成功:
        op->SetInsertSucceeded
    - 写入失败:
      若是IsAlreadyPresent
      - UPSERT:
        同上,但是写入的是MRSApplyUpsertAsUpdate
      - INSERT_IGNORE:
        返回
      - INSERT:
        返回错误
        更新metrics

12
Status Tablet::MutateRowUnlocked(const IOContext* io_context,
                                 WriteOpState *op_state,
                                 RowOp* op,
                                 ProbeStats* stats)
  根据之前的查找结果, 得到其所在的RowSet(没找到时, 使用MRS)
  修改行,rs_to_attempt->MutateRow
  - MemRowSet::MutateRow
    构造mutationbtree::PreparedMutationprobe.encoded_key_slice
    找到该行数据MRSRow
    如果已删除,则返回错误
    将本地mutation修改到redolist上Mutation::AppendToListAtomic
  - DiskRowSet::MutateRow
    找到该行数据在base data中的indexCFileSet::FindRow
    未找到,则返回错误
    检查数据是否已被删除,DeltaTracker::CheckRowDeleted
      先检查DeltaMemStoredms_->CheckRowDeleted
      再遍历检查所有的DeltaStoreds->CheckRowDeleted
    更新数据到delta trackerDeltaTracker::Update
      DeltaMemStore::Update
        如果没有dms,则构造一个,CreateAndInitDMSUnlocked
        更新到dms上dms_->Update
          - mutation已存在(该行的更新存在相同的timestamp)
            incr seq number并重试(再失败则报错)
          插入mutationmutation.Insert
  - 写入成功:
    op->SetInsertSucceeded
  - 写入失败:
    若是IsNotFound
    - UPDATE_IGNORE/DELETE_IGNORE:
      返回
    - UPDATE/DELETE:
      返回错误