序
本文记录 kudu 源码阅读笔记,记录了 kudu tserver 端非事务写入的完整流程。
写入流程总览
Kudu 的一次“非事务”写(普通单/批行 Insert/Upsert/Update/Delete,不带 txn_id)在 tserver 侧需要经历以下阶段:
- Client 组装
WriteRequestPB并通过 RPC 发送 (tserver_service.proto: Write). - RPC 入口:
TabletServiceImpl::Write()做权限、资源(内存 / 线程池)与节流检查,构造WriteOpState。 - 提交给
TabletReplica::SubmitWrite(),创建WriteOp与OpDriver(LEADER 模式)。 OpDriver::ExecuteAsync()进入 Prepare 阶段:解码行、锁定 schema、行级锁 / 分区锁(非事务写仅行锁)、权限校验。- (LEADER)向 RAFT 复制:给
ReplicateMsg分配 timestamp,RaftConsensus::Replicate()开始复制;Follower 侧由共识回放直接创建 FollowerOpDriver并 Start。 - RAFT 提交(
ReplicationFinished())后进入 Apply 阶段:WriteOp::Apply()调用Tablet::ApplyRowOperations()执行实际插入/更新/删除(写入 MemRowSet / DeltaMemStore)。 - 生成
CommitMsg写入 WAL;WriteOp::Finish()更新 MVCC 状态与各种 metrics;释放行锁、schema 锁。 - RPC 回调发送
WriteResponsePB给客户端(含 per-row errors、资源 metrics、timestamp 等)。
流程总览图如下:
| |
写入流程的时序图:
| |
流程关键特点:
- 采用 MVCC (
MvccManager) 保证读写隔离与快照语义;写在获取 timestamp 后进入 in-flight 集合,Apply 完成后可见。 - 写路径分 Prepare / Replicate / Apply / Commit 四个大的阶段,由
OpDriver状态机驱动。 - 非事务写不涉及
TxnRowSets与 partition lock(除启用FLAGS_enable_txn_partition_lock时 follower 列表差异),无提交/回滚二阶段。
下文将对一些关键的阶段进行分析。
RPC 入口与初始校验
入口文件/方法:src/kudu/tserver/tablet_service.cc: TabletServiceImpl::Write()
核心逻辑:
- Tablet 查找:
LookupRunningTabletReplicaOrRespond()。 - 访问控制:可选 Token -> privileges(INSERT / UPDATE / DELETE)。
- 外部一致性模式检查:
clock()->SupportsExternalConsistencyMode()。 - 节流:
tablet->ShouldThrottleAllow(bytes)、进程软内存限制process_memory::SoftLimitExceeded()、应用队列是否过载tablet_apply_pool()->QueueOverloaded()。 - 构造
WriteOpState:携带请求、响应、以及可选授权上下文。 - 区分非事务/事务:无
txn_id->SubmitWrite();有txn_id-> 走事务前置调度SubmitTxnWrite()。
WriteOpState 与 WriteOp
文件:src/kudu/tablet/ops/write_op.h / write_op.cc
关键数据结构与持有关系:
WriteOpState:保存原始请求、解析后的RowOp列表、行锁集合ScopedRowLock、MVCCScopedOp、schema 快照、metrics 计数器。WriteOp:以成员std::unique_ptr<WriteOpState> state_持有WriteOpState(参见write_op.h,class WriteOp中的state_),并对其进行阶段驱动(Prepare/Start/Apply/Finish)。- 生命周期:
WriteOp在构造时接管WriteOpState的所有权;阶段结束时通过Finish()触发WriteOpState::FinishApplyingOrAbort(),释放行锁、schema 锁与 MVCC 事务对象。
- 生命周期:
阶段职责:
以下结合源码,具体分析 WriteOp 四个核心阶段的实现与与 OpDriver 的耦合:
WriteOp 关键阶段分析
Prepare(解析与锁定)
- 入口:
WriteOp::Prepare()(write_op.cc)。由OpDriver::Prepare()调用。 - 主要步骤:
- Schema 快照与解码:
Tablet::DecodeWriteOperations(state)根据请求中的RowOperationsPB解析为内部RowOp列表,并捕获当前 schema 快照(避免并发更改影响)。 - 行锁获取:
Tablet::AcquireRowLocks(state)为每个待操作的 key 申请ScopedRowLock(排他),批量方式减少争用;支持UPSERT_IGNORE等在存在冲突时的特殊处理策略。 - 权限与合法性检查:结合
TabletServiceImpl::Write()已做的令牌校验,进一步在Prepare()中检查请求类型(INSERT/UPDATE/DELETE)与目标表的约束(如不可变列更新)。 - 事务差异:非事务写不获取分区/事务锁;事务写(本文略)在此阶段可能受限仅允许 INSERT 类,并需额外锁定。
- 行定址与列投影准备:为后续
ApplyRowOperations()的批量存在性检查与列写入准备必要的 key probe 与列投影上下文,降低 Apply 阶段的重复计算开销。
- Schema 快照与解码:
- 输出:
prepare_state_ = PREPARED(由OpDriver设置)。- 行锁已持有;
RowOp已解码;错误(如 schema mismatch、行不存在/已存在)在 per-row 层面记录,或整体失败返回。
- 与 OpDriver 的耦合:若此时复制尚未触发,Leader 在随后分配 ts 并
Start();若复制已完成(回调先到),OpDriver会立即尝试进入ApplyAsync()。
Start(登记 MVCC 与时间戳)
- 入口:
WriteOp::Start()(write_op.cc)。Leader 在OpDriver::Prepare()完成解码与锁定后调用;Follower 在OpDriver::Init()早期调用(回放路径)。 - 主要步骤:
- 时间戳设置:Leader 路径由
OpDriver::Prepare()调用time_manager()->AssignTimestamp()后,将 ts 写入ReplicateMsg与WriteOpState;Follower 路径从回放的ReplicateMsg取 ts。 - MVCC RESERVED 登记:
Tablet::StartOp(state)->MvccManager::StartOp(ts)将本写操作标记为RESERVED,进入 in-flight 集合。
- 时间戳设置:Leader 路径由
- 目的:确保在复制与准备并行的情况下,读的安全时间不会越过尚未提交的写;并为后续
APPLYING/APPLIED状态转换做好铺垫(详见第 5 节)。
Apply(实际行写入与 CommitMsg 构建)
- 入口:
WriteOp::Apply(CommitMsg* msg)(write_op.cc)。由OpDriver::ApplyTask()调用。 - 前置:
WriteOpState::StartApplying()->MvccManager::StartApplyingOp(ts)将状态从RESERVED转为APPLYING。 - 主要步骤:
- 批量存在性检查:
Tablet::ApplyRowOperations(state)内部首先进行BulkCheckPresence对多行键进行存在性探测,减少逐行冲突代价。 - 逐行应用:
ApplyRowOperation(state, row_op):- Insert/Upsert:
InsertOrUpsertUnlocked()将数据写入当前MemRowSet,必要时分配新 row id;UPSERT_IGNORE在存在冲突时按忽略策略处理。 - Mutate(Update/Delete):
MutateRowUnlocked()写入DeltaMemStore形成列增量或删除标记。
- Insert/Upsert:
- 结果与错误:每行生成
RowOpResultPB(成功与错误类型,如ALREADY_PRESENT、NOT_FOUND、MISMATCHED_SCHEMA等),累计到WriteResponsePB的per_row_errors。 - CommitMsg 构建:填充
CommitMsg(含commited_op_id、影响的 RowSets/DeltaStores 引用、时间戳等),供 WALAsyncAppendCommit()持久化。
- 批量存在性检查:
- 与外部一致性:
OpDriver::ApplyTask()在非COMMIT_WAIT模式下可能提前推进AdjustNewOpLowerBound(ts);COMMIT_WAIT则在 WAL 追加后执行CommitWait()再响应。
Finish(完成或中止,释放与统计)
- 入口:
WriteOp::Finish(Op::ApplyResult result)(write_op.cc)。由OpDriver::Finalize()调用。 - 主要步骤:
- 成功路径:
FinishApplyingOrAbort(success=true)->MvccManager::FinishApplyingOp(ts)将写标记为APPLIED(可见点);释放行锁、schema 锁;更新计数与 metrics(如写入行数、延迟统计)。 - 失败路径:在复制失败、准备失败或应用失败场景下,调用
FinishApplyingOrAbort(success=false)->AbortOp(ts),确保读视图不受污染,并释放所有持有资源。 - 回调与清理:通过
completion_callback将WriteResponsePB返回客户端;op_tracker_->Release(this)结束对该OpDriver的跟踪。
- 成功路径:
- 与 WAL/恢复:成功路径对应 WAL 已写入
CommitMsg;恢复时由tablet_bootstrap.cc的重放逻辑将该 ts 的操作重建为已应用状态。
错误处理与幂等性
- Prepare 级错误:解码/权限/锁定失败,若尚未复制成功则安全失败返回;若复制已完成则触发 FATAL(避免一致性破坏)。
- 复制失败:在
OpDriver::ReplicationFinished()标记失败后,ApplyAsync()路径统一HandleFailure(op_status_)并在Finish()中AbortOp(ts)。 - 应用失败:逐行错误通过
per_row_errors返回;不可恢复错误导致整体AbortOp(ts)。 - Follower 幂等响应:若需要返回客户端(重试附着),在
Init()/Prepare()中注册FollowerOpCompletionCallback;失败统一返回ERROR_SERVER_TOO_BUSY促使客户端重试附着到 Leader。
重要源码位置
src/kudu/tablet/ops/write_op.cc:Prepare()、Start()、Apply()、Finish()。src/kudu/tablet/tablet.cc:DecodeWriteOperations()、AcquireRowLocks()、ApplyRowOperations()、ApplyRowOperation()、InsertOrUpsertUnlocked()、MutateRowUnlocked()。src/kudu/tablet/ops/op_driver.cc:驱动Prepare()/ApplyTask()/Finalize()、时间戳分配与外部一致性处理(AdjustNewOpLowerBound、CommitWait)。src/kudu/tablet/mvcc.h:StartOp()、StartApplyingOp()、FinishApplyingOp()、AbortOp();读快照MvccSnapshot。
OpDriver 状态机
源码位置:src/kudu/tablet/ops/op_driver.h / op_driver.cc
OpDriver 负责把一个逻辑写操作(WriteOp)从“构造”推进到“复制提交”“应用”“完成”,内部以两个正交状态变量跟踪进度:
状态变量
replication_state_(复制相关):NOT_REPLICATING:尚未进入 Raft 复制(Leader 写在执行 Prepare 前的初始状态)。REPLICATING:复制请求已发起(Leader 调用RaftConsensus::Replicate()后;Follower 在Init()中调用Start()后即设为 REPLICATING)。REPLICATION_FAILED:复制阶段失败(发送或共识失败;Leader 在 Replicate 返回非 OK,或回调ReplicationFinished()中 status 非 OK)。REPLICATED:共识层确认提交(本操作在多数副本持久化 ReplicateMsg,对应回调ReplicationFinished()成功路径)。
prepare_state_(准备相关):NOT_PREPARED:尚未完成本地 Prepare(还没解码行/锁定)。PREPARED:op_->Prepare()成功执行(已完成 schema 解码、行锁获取、权限与合法性检查)。
两个状态必须同时满足:replication_state_ == REPLICATED 且 prepare_state_ == PREPARED 才能进入 Apply(ApplyAsync())。失败则走 Abort/HandleFailure。
主要方法与状态流转
以下描述 Leader 普通写操作(非事务)完整路径:
- 构造:
TabletReplica::SubmitWrite()-> 创建WriteOp+NewLeaderOpDriver()->OpDriver::Init():- 检查 tablet 未停止。
- 复制状态初始
NOT_REPLICATING,准备状态初始NOT_PREPARED。 - 如果是 FOLLOWER 分支(重放),这里会直接:设置
op_id_copy_、replication_state_ = REPLICATING、调用op_->Start()(立即把 MVCC 置为 RESERVED),并在结果跟踪需要时设置 Follower 回调。
- 调度:
ExecuteAsync():- Leader:调用
consensus_->CheckLeadershipAndBindTerm()绑定当前 term。 - 提交
PrepareTask()到 prepare 线程池(串行 token 保证准备顺序)。
- Leader:调用
- 准备:
PrepareTask():- Deadline 校验(准备队列等待超时直接
HandleFailure(Status::TimedOut))。 - 调用
Prepare()真正执行:op_->Prepare()-> 解码请求、锁 row/schema、权限校验。- 设置
prepare_state_ = PREPARED(加锁保护)。 - Follower:在此注册结果跟踪
RegisterFollowerOpOnResultTracker(),避免竞态。 - Leader:检查自己仍是 Driver(去重重复请求)。
- 分支:
- 若复制尚未触发(
replication_state_ == NOT_REPLICATING):- 分配时间戳:
time_manager()->AssignTimestamp()-> 写入 replicate_msg。 - 调用
op_->Start()(创建 MVCC ScopedOp,进入 RESERVED)。 - 设置
replication_state_ = REPLICATING,调用RaftConsensus::Replicate()。
- 分配时间戳:
- 若复制已完成(
REPLICATED或REPLICATION_FAILED,可能回调先到):直接尝试ApplyAsync()。
- 若复制尚未触发(
- Deadline 校验(准备队列等待超时直接
- 复制完成:
ReplicationFinished(status)(Raft 回调线程):- 赋值
op_id_copy_(共识生成的真实 OpId)。 - 若
status.ok():replication_state_ = REPLICATED;否则:replication_state_ = REPLICATION_FAILED并调用op_->AbortPrepare()(防止继续准备逻辑)。 - 如果此时已
prepare_state_ == PREPARED->ApplyAsync();否则等待准备线程设置 PREPARED 后进入 Apply。
- 赋值
- 应用调度:
ApplyAsync():- 必须在锁下看到:
prepare_state_ == PREPARED。 - 若复制失败:
HandleFailure(op_status_)-> Abort。 - 正常:验证顺序(
order_verifier_->CheckApply());若外部一致性模式不是 COMMIT_WAIT,则提前推进 MVCC 新操作下界:AdjustNewOpLowerBound(timestamp)。 - 将
ApplyTask()提交到 apply 线程池。
- 必须在锁下看到:
- Apply 阶段:
ApplyTask():- 再次校验 tablet 未停止。
- 调用
op_->Apply(&commit_msg):内部执行Tablet::ApplyRowOperations()。 - 设置
commit_msg.commited_op_id(拼写保留源码)为op_id_copy_;写响应 timestamp。 - 异步 WAL 追加:
log_->AsyncAppendCommit()。 - 外部一致性模式为 COMMIT_WAIT 时执行
CommitWait():等待本地时钟前进到事务时间戳之后。 - 调用
Finalize()。
- Finalize:
op_->Finish(Op::APPLIED)-> MVCC FinishApplying,释放锁,更新 metrics。- 调用 completion callback 发送 RPC 响应。
op_tracker_->Release(this)清除跟踪。
Follower 操作差异(重放):
- 在
Init()即:replication_state_ = REPLICATING并调用op_->Start(),因为 replicate 消息已含 timestamp,保证进入 MVCC 早于 safe time 推进。 - 准备与复制回调可能顺序互换;两者任一先到都只设置自己的状态,最后由后到达的一方检测到另一方已就绪再触发
ApplyAsync()。 - 结果跟踪:只有需要返回客户端(幂等重试)时注册
FollowerOpCompletionCallback,失败统一报告 TOO_BUSY 让客户端重试重新附着 Leader 正常路径。
回调注册与触发(Leader/Follower)
- Leader 回调注册(复制完成):
OpDriver::Init()在创建共识轮次时注册复制完成回调,触发后进入OpDriver::ReplicationFinished():- 源码:
src/kudu/tablet/ops/op_driver.cc中mutable_state()->set_consensus_round(consensus_->NewRound(std::move(replicate_msg), [wp](const Status& s){ /* sp->ReplicationFinished(s) */ }));
- 源码:
- Follower 回调注册(复制完成):
TabletReplica::StartFollowerOp()在 round 上设置回调,同样进入ReplicationFinished():- 源码:
src/kudu/tablet/tablet_replica.ccstate->consensus_round()->SetConsensusReplicatedCallback([wp](const Status& s){ /* sp->ReplicationFinished(s) */ });
- 源码:
- 复制触发非阻塞:Leader 在
Prepare()中调用consensus_->Replicate(mutable_state()->consensus_round())发起复制;该调用不等待共识完成,失败则立即将replication_state_置为REPLICATION_FAILED并返回错误;成功则待 Raft 层稍后通过上述回调通知ReplicationFinished()。 - 结果跟踪(Follower):若需要幂等响应,
Init()中为 Follower 设置FollowerOpCompletionCallback,失败统一返回ERROR_SERVER_TOO_BUSY促使客户端重试并附着到 Leader 执行路径。
状态机(线程并发视角)
| |
状态转移表(Leader 写)
| 事件 | 前状态 (replication, prepare) | 后状态 | 触发代码 | 说明 |
|---|---|---|---|---|
| Init | (NOT_REPLICATING, NOT_PREPARED) | 同上 | Init() | 构造完成 |
| Prepare 成功且未复制 | (NOT_REPLICATING, NOT_PREPARED) | (REPLICATING, PREPARED) | Prepare() | 分配 ts + Start + Replicate |
| Replicate 回调成功先于 Prepare | (REPLICATING, NOT_PREPARED) | (REPLICATED, NOT_PREPARED) | ReplicationFinished() | 等待 Prepare 完成 |
| Replicate 回调成功后 Prepare 完成 | (REPLICATED, PREPARED) | (REPLICATED, PREPARED) | Prepare()/ReplicationFinished() | 进入 ApplyAsync |
| Replicate 回调失败 | (REPLICATING, *) | (REPLICATION_FAILED, *) | ReplicationFinished() | 若已 PREPARED 后续 ApplyAsync 里走失败分支 |
| Prepare 队列超时 | (NOT_REPLICATING, NOT_PREPARED) | (REPLICATION_FAILED, NOT_PREPARED) | PrepareTask()/HandleFailure() | 直接失败响应 |
| ApplyAsync 启动 | (REPLICATED, PREPARED) | (REPLICATED, PREPARED) | ApplyAsync() | 提交 ApplyTask |
| ApplyTask 成功 + 非 COMMIT_WAIT | (REPLICATED, PREPARED) | (REPLICATED, PREPARED) | ApplyTask()/Finalize() | 提前推进 MVCC 下界 |
| ApplyTask 成功 + COMMIT_WAIT | (REPLICATED, PREPARED) | (REPLICATED, PREPARED) | CommitWait()/Finalize() | 等待时钟后 Finalize |
| Finalize | (REPLICATED, PREPARED) | 终结 | Finalize() | 完成回调释放资源 |
异常与失败处理
- 复制失败:
ReplicationFinished()设置REPLICATION_FAILED,若已准备则ApplyAsync()中走HandleFailure(op_status_)-> abort。 - 准备阶段失败(解码/锁/权限):
Prepare()返回非 OK ->HandleFailure():- 若已复制(REPLICATING/REPLICATED)且 tablet 未停止 -> FATAL(保证一致性)。
- 否则:标记 ABORTED,回调错误,释放 op。
- 超时:在排队准备超过 deadline -> 统计
ops_timed_out_in_prepare_queue后HandleFailure(Status::TimedOut)。 - Tablet 停止:
ApplyTask()或Init()检查到tablet->HasBeenStopped()->HandleFailure(IllegalState)。 - COMMIT_WAIT 等待失败(时钟问题)当前实现
CHECK_OK(CommitWait()),失败视为崩溃(防止不一致)。
外部一致性模式影响
- 非 COMMIT_WAIT:复制完成后即可推进
AdjustNewOpLowerBound(timestamp),增强后续读的可见范围。 - COMMIT_WAIT:在 WAL commit 之后等待本地时钟单调前进到提交时间戳之后再回复客户端(保证线性化顺序)。
并发与顺序保证
- Prepare 队列串行 token:确保同一 tablet 的准备顺序与 MVCC timestamp 分配顺序一致(避免交叉写入破坏逻辑时序)。
order_verifier_->CheckApply():在 Apply 前核对提交顺序(OpId index 与准备物理时间戳)避免乱序应用。- 双状态竞态解决:复制回调与准备线程各自只写自己相关状态,在任一完成后检查另一标志是否就绪,若都就绪立刻触发 Apply。
调试与观测
- 状态字符串:
StateString()输出简写:NR|R|RF|RD+NP|P(例如R-NP,RD-P)。 - Metrics:
replication_duration(复制耗时)在ReplicationFinished()增量。ops_timed_out_in_prepare_queue在超时路径增量。commit_wait_duration_usec在 COMMIT_WAIT 模式等待结束后记录。
- Trace:
TRACE_EVENT_FLOW_BEGIN/END包裹 ExecuteAsync/PrepareTask/ApplyTask 便于 Chrome tracing 分析链路。
Follower 特殊点小结
- Timestamp 已在 replicate 消息中;不再调用
AssignTimestamp()。 op_->Start()在 Init 时立即执行,保证其进入 MVCC 保留区防止 safe time 提前。- 失败统一使用 FOLLOWER 回调给客户端(若跟踪)返回 TOO_BUSY 促使重试走 Leader。
综上,OpDriver 通过两个独立状态使复制与准备并行化,减少等待,同时确保只有在“本地准备完成 + 共识提交”后才进入不可回滚的 Apply 阶段,从而实现写操作的高并发与严格一致性。
MVCC 管理(与写流程的关键耦合)
核心文件/类型:src/kudu/tablet/mvcc.h: class MvccManager、src/kudu/tablet/tablet.cc、src/kudu/tablet/ops/op_driver.cc
本节聚焦 MVCC 在“非事务写流程”的每个阶段中的作用与状态演进,并说明它如何与外部一致性(COMMIT_WAIT/非 COMMIT_WAIT)、WAL、读快照共同协作,保证线性化或快照一致的可见性。
MVCC 状态与时间戳
- 时间戳分配(Leader):在
OpDriver::Prepare()中,Leader 通过time_manager()->AssignTimestamp()为本次写分配单调递增的 MVCC 时间戳,并写入ReplicateMsg;随后调用WriteOp::Start()->Tablet::StartOp()->MvccManager::StartOp(ts),将该操作置为RESERVED(在-flight)。Follower 路径直接从回放的ReplicateMsg中读取时间戳,并在OpDriver::Init()早期调用op_->Start()完成RESERVED登记。 - 状态序列:MVCC 为一次写操作维护以下典型状态:
RESERVED(占位,尚未可见)→APPLYING(正在应用)→APPLIED(已应用,成为可见的一致性点)或ABORTED(失败终止)。 - 进入 APPLYING:在
ApplyAsync()触发后,WriteOpState::StartApplying()调用MvccManager::StartApplyingOp(ts),把该写从RESERVED转为APPLYING,这保证在并发读写下,读不会提前看到半成品。 - 完成/中止:
WriteOp::Finish()内部经由WriteOpState::FinishApplyingOrAbort()调MvccManager::FinishApplyingOp(ts)(成功)或AbortOp(ts)(失败)。成功路径将该时间戳推进为正式可见的提交点;失败路径则让读视图忽略该操作。
源码指引:
mvcc.h:StartOp()、StartApplyingOp()、FinishApplyingOp()、AbortOp()以及快照MvccSnapshot的定义。tablet.cc:Tablet::StartOp()、Tablet::ApplyRowOperations()中进入/退出 APPLYING 的时机。op_driver.cc:Prepare()分配时间戳与调用op_->Start();ApplyAsync()前后的StartApplying()/FinishApplyingOrAbort()。
与 OpDriver 阶段的精确耦合
- Prepare 阶段:Leader 分配时间戳并登记
RESERVED;Follower 使用回放时间戳并在Init()中即刻登记RESERVED。此举确保在复制与准备并行的竞态下,写操作已经被 MVCC 记录为“在-flight”,读在安全时间推进时不会越过它。 - Replicate/Commit(共识提交)后:当
ReplicationFinished()置为REPLICATED且prepare_state_ == PREPARED,进入ApplyAsync()。在ApplyTask()内部构建CommitMsg并异步 WAL 追加后,若外部一致性为COMMIT_WAIT,执行CommitWait()等本地时钟前进到该时间戳之后;随后Finalize()中将 MVCC 置为APPLIED。 - 非 COMMIT_WAIT 模式的下界推进:Leader 在
ApplyAsync()前调用tablet_->mvcc_manager()->AdjustNewOpLowerBound(ts)(参见op_driver.cc),提前推进“新操作的可见下界”;这使得后续读在安全时间判断上更早“看见”该写,从而降低尾延迟。若启用COMMIT_WAIT,则改由CommitWait()保证线性化,AdjustNewOpLowerBound不提前。
读快照与可见性
- MvccSnapshot:读路径构造
MvccSnapshot(snapshot_ts),仅返回< snapshot_ts且状态为APPLIED的写入结果;对RESERVED/APPLYING及ABORTED的操作不可见。 - 安全时间(safe time)与读:Leader 维护安全时间推进,确保读不会越过尚未
APPLIED的写。Follower 在Init()先行RESERVED登记的设计,防止其安全时间推进导致读“跳过”该写(避免可见性反常)。 - 外部一致性影响:在
COMMIT_WAIT模式下,RPC 响应在CommitWait()之后返回,确保客户端在读回本写时已满足线性化顺序;非COMMIT_WAIT下,响应可能早于本地时钟完全推进,但通过 MVCC 下界/安全时间控制,外部读取仍遵循快照一致。
失败/中止与一致性
- Prepare 失败:若在
Prepare()解码/锁/权限失败,在未复制成功情况下,直接AbortOp(ts)(或未登记则无需 Abort);若复制已成功(不应发生正常路径),系统会以 FATAL 保障一致性,避免出现“复制提交但本地未应用”的状态。 - 复制失败:
ReplicationFinished()置REPLICATION_FAILED,ApplyAsync()走HandleFailure(),随后FinishApplyingOrAbort()选择AbortOp(ts)。该写不会进入可见集,读视图不受污染。 - Apply 失败:在
Apply()阶段遇到不可恢复错误,最终也会经FinishApplyingOrAbort()走AbortOp(ts),保持读视图纯净。
与 WAL/恢复的协作
- 持久化顺序:WAL 先记录
ReplicateMsg(含 ts),后记录CommitMsg(写入位置与结果)。MVCC 的APPLIED对应已经写入内存结构并在 WAL 有相应 commit 记录。 - 恢复(Bootstrap):在
tablet_bootstrap.cc重放时,先根据ReplicateMsg重建 in-flight 集合(这些操作初始等价于RESERVED),再根据CommitMsg逐步将其应用与完成(相当于进入APPLYING→APPLIED)。缺失CommitMsg的 in-flight 操作会被视为未完成,最终在恢复结束时按AbortOp(ts)处理,确保一致性。
代码位置(快速索引)
src/kudu/tablet/mvcc.h:StartOp()、StartApplyingOp()、FinishApplyingOp()、AbortOp()、AdjustNewOpLowerBound()、MvccSnapshot。src/kudu/tablet/ops/op_driver.cc:Prepare()(Leader 赋 ts)、ApplyAsync()(非 COMMIT_WAIT 下界推进)、ApplyTask()(构建 commit 与 WAL 追加)、Finalize()(FinishApplying)。src/kudu/tablet/tablet.cc:Tablet::StartOp()、Tablet::ApplyRowOperations()、WriteOpState::StartApplying()/FinishApplyingOrAbort()调用栈。src/kudu/tablet/tablet_bootstrap.cc:恢复流程对 replicate/commit 的重放与 MVCC 状态重建。
综上,MVCC 贯穿非事务写的全链路:在 Prepare/Replicate 期间以 RESERVED 防止读越界,在 Apply 前转入 APPLYING 隔离半成品,在 Finalize 时标记为 APPLIED 或 ABORTED 决定可见性;与外部一致性模式、WAL 提交顺序、安全时间推进共同确保 Kudu 的强一致写入与可预测读视图。
状态流
| |
锁与并发控制
组件:
- Schema 锁:
Tablet::DecodeWriteOperations()中AcquireSchemaLock(),防止并发 schema 变更影响解码与应用。 - 行锁:
Tablet::AcquireRowLocks()-> 为每行 key 构造RowSetKeyProbe后批量申请ScopedRowLock(排他)。 - 分区锁 / 事务锁:仅事务写使用;非事务写忽略或按 flag 控制。
- MVCC 保证提交顺序与可见性;WAL Commit 序列化恢复信息。
Schema 快照与解码(写路径的入口准备)
核心文件/方法:src/kudu/tablet/tablet.cc: Tablet::DecodeWriteOperations()、src/kudu/common/schema.h、src/kudu/common/wire_protocol.proto: RowOperationsPB、src/kudu/tablet/tablet.cc: AcquireSchemaLock()。
本节系统化梳理“Schema 快照与解码”的具体流程,它是第 3.1 节 Prepare 的首要工作,对第 8 节的行级应用与错误映射直接提供输入,且在并发 Alter Table 时保证写路径的结构稳定。
为什么需要 Schema 快照
- 并发安全:通过
AcquireSchemaLock()获取读锁,冻结当前Schema视图,避免写在解码与应用过程中被并发的 schema 变更破坏(列增删、类型变更等)。 - 一致性语义:同一批
RowOp在 Prepare/Apply 期间使用同一个 schema 快照;读路径通过 MVCC 快照与 schema 版本配合,保证结果的可解释性。 - 性能考量:快照后的列投影与类型映射可以被重用,减少 Apply 阶段的重复计算与分支判断。
解码管线(RowOperationsPB → RowOp)
- 入口:
Tablet::DecodeWriteOperations(state)。 - 步骤:
- 读取
WriteRequestPB.row_operations中的RowOperationsPB,根据操作类型(INSERT/UPSERT/UPDATE/DELETE/及 IGNORE 变体)逐一解析; - 将客户端列编码映射到内部列 ID 与类型(
Schema提供列查找与类型校验),对缺失必填列、未知列、类型不匹配进行早期拦截; - 构造内部
RowOp:包含操作类型、主键投影(用于后续行锁与路由)、列值投影(用于插入或增量更新),并绑定到WriteOpState的row_ops容器; - 捕获本次解码使用的
Schema快照并保存在WriteOpState,供 Apply 阶段的行定址与列写入使用。
- 读取
- 错误映射:
- 结构性错误:
MISMATCHED_SCHEMA(类型错误/列不存在/缺列)、IMMUTABLE_COLUMN(不可变列写入)等在解码时直接判定; - 行级错误:主键缺失或非法在此阶段即可形成 per-row 错误占位,或推迟到存在性检查阶段完善(见第 8 节)。
- 结构性错误:
主键与列投影
- 主键处理:基于
Schema的主键列生成用于锁与路由的 key 投影(配合第 8 节的BulkCheckPresence与RowSetKeyProbe)。 - 列投影:为 INSERT/UPSERT 构造完整或部分列写入投影;为 UPDATE/DELETE 构造变更列投影与删除标记;
- 默认值/空值:按
Schema默认值与列约束补全缺失列,或记录错误返回(取决于表的约束与操作类型)。
与 Prepare/Apply/MVCC 的关系
- 与 Prepare(第 3.1 节):解码与行锁获取是 Prepare 的核心;解码产生的
RowOp、主键/列投影会直接进入行锁与后续存在性检查; - 与 Apply(第 8 节):
ApplyRowOperations()依赖解码出的RowOp与列投影进行批量存在性检查与分派到InsertOrUpsertUnlocked/MutateRowUnlocked; - 与 MVCC(第 5 节):解码不直接改变 MVCC,但其产出的批次与行集合在
StartApplying()→FinishApplying所覆盖的“可见性窗口”内一致生效。
典型错误与返回路径
MISMATCHED_SCHEMA:客户端提供的列与当前快照不一致(列名/类型);IMMUTABLE_COLUMN:尝试更新不可变列;ALREADY_PRESENT/NOT_FOUND:通常在 Apply 阶段的存在性检查与实际写入中形成,但部分前置条件(主键为空等)可在解码时直接标记;- 所有 per-row 错误最终汇入
WriteResponsePB.per_row_errors,整体失败则由OpDriver::HandleFailure()统一处理。
代码位置(快速索引)
src/kudu/tablet/tablet.cc:AcquireSchemaLock()、DecodeWriteOperations();src/kudu/common/schema.h/.cc:Schema定义、列查找与类型校验、默认值处理;src/kudu/common/wire_protocol.proto:RowOperationsPB(客户端传输格式);- 关联:第 3.1 节 Prepare、第 8 节 Row 应用逻辑、第 5 节 MVCC 管理。
总结:Schema 快照与解码是写路径的“入口准备”,它将不稳定的客户端行操作转换为受当前 schema 约束的内部 RowOp 序列,并与行锁、存在性检查、列写入、per-row 错误生成紧密衔接,是保证并发安全与可读可恢复性的关键步骤。
Row 应用逻辑(解码后的逐行执行与结果生成)
核心文件/方法:src/kudu/tablet/tablet.cc 中 Tablet::ApplyRowOperations()、ApplyRowOperation()、InsertOrUpsertUnlocked()、MutateRowUnlocked(),以及 BulkCheckPresence()。
本节承接第 3 节(Prepare 产出 RowOp + 行锁)与第 5 节(进入 APPLYING 状态),具体说明一次写如何将解码后的每条 RowOp 应用到内存结构、生成 per-row 结果,并为 WAL 的 CommitMsg 填充必要信息。
应用阶段总览
| |
数据流总览(Apply 阶段内部)
- 进入
Apply()前,WriteOpState::StartApplying()已切换 MVCC 状态至APPLYING(见第 5 节)。 Tablet::ApplyRowOperations(state):
- 预扫描:对同批次的 row keys 执行
BulkCheckPresence(state),统一查询当前RowSetTree(覆盖内存与磁盘 rowset)来判断是否“存在”。 - 遍历
state->row_ops():逐条调用ApplyRowOperation(state, row_op),依据类型(INSERT/UPSERT/UPDATE/DELETE/其 IGNORE 变体)分派到具体实现。 - 聚合 per-row 结果、累加行数计数器,并填入
TxResultPB以便CommitMsg写入 WAL。
批量存在性检查 BulkCheckPresence
- 目的:避免每条 row 重复进行昂贵的 rowset 路由与存在性探测,降低锁持有时间与索引/布隆过滤器查询开销。
- 实现思路(概念):
- 将待操作的主键集合批量投影成
RowSetKeyProbe,在RowSetTree中并行/批量地判断是否已存在; - 将“已存在/不存在”标记回写到对应的
RowOp上,供后续分派快速分支。
- 将待操作的主键集合批量投影成
- 连贯性:与第 3.1 节 Prepare 中的“行锁已经获取”配合,保证在 Apply 时该批次上的存在性判断与实际写入不会被其他写打破不变量。
Insert / Upsert 路径
- 分派条件:
- INSERT:要求 BulkCheckPresence 标记为“不存在”,否则产生 per-row 错误
ALREADY_PRESENT; - UPSERT:若不存在则走插入;若存在则转入“部分列更新”的 Upsert 逻辑;
- INSERT_IGNORE / UPSERT_IGNORE:在存在冲突时按“忽略”策略不报错(计入影响行数/结果标记)。
- INSERT:要求 BulkCheckPresence 标记为“不存在”,否则产生 per-row 错误
- 执行:
InsertOrUpsertUnlocked(state, row_op)(在已持有行锁与 schema 快照前提下)- 定位目标
MemRowSet并分配新 row id(插入场景); - 将行构造为 base row 追加到
MemRowSet; - 对于 UPSERT 存在场景,仅对指定列做“若存在则更新”的轻量变更(实现可能复用 mutate 写入 delta)。
- 定位目标
- 结果:
RowOpResultPB中记录 INSERT/UPSERT 的结果状态和(可选)新行位置/版本信息;- 计数器:累计
num_inserts/num_upserts等,用于响应与 metrics。
Update / Delete 路径(增量写入 DeltaMemStore)
- 分派条件:
- UPDATE:要求 BulkCheckPresence 标记为“存在”,否则 per-row 错误
NOT_FOUND; - DELETE:同样要求存在。
- UPDATE:要求 BulkCheckPresence 标记为“存在”,否则 per-row 错误
- 执行:
MutateRowUnlocked(state, row_op)- 在目标 rowset 中根据 key/row id 定位老版本;
- 将列变更或删除标记写入
DeltaMemStore(Redo delta),保持 base 行不变; - 按 schema 约束拒绝不合法的列更新(如不可变列),映射成 per-row 错误类型(
IMMUTABLE_COLUMN/MISMATCHED_SCHEMA等)。
- 结果:
RowOpResultPB记录 UPDATE/DELETE 的成功与否;- 计数器:累计
num_updates/num_deletes; - 后续由 compaction 将 base 与 delta 归并,读路径通过
MvccSnapshot与列读取器按需“合成”可见版本。
per-row 结果与错误映射
- 每条
RowOp都对应一个RowOpResultPB:成功/失败、失败原因(ALREADY_PRESENT、NOT_FOUND、MISMATCHED_SCHEMA、IMMUTABLE_COLUMN等),这些被回填到WriteResponsePB.per_row_errors。 - IGNORE 变体不将“可忽略冲突”计为错误,但会反映在影响行数计数中,以便客户端理解“部分未变更”。
- 文档连贯:与第 3.1 节 Prepare(权限/合法性初筛)呼应——一些结构性错误在 Prepare 即被拒绝;行级冲突更多发生在 Apply 的存在性与实际写入阶段。
与 MVCC、WAL、OpDriver 的衔接
- MVCC:Apply 期间处于
APPLYING;结束时由WriteOp::Finish()决定APPLIED/ABORTED(见第 5 节)。 - WAL:
ApplyTask()构造/填充CommitMsg(包含commited_op_id、时间戳、TxResultPB 摘要等),log_->AsyncAppendCommit()异步持久化,支持恢复时的重放(见第 9 节)。 - OpDriver:在非
COMMIT_WAIT模式下于ApplyAsync()前可能调用AdjustNewOpLowerBound(ts)提前放宽读的下界;COMMIT_WAIT模式则在 WAL 追加后CommitWait()再响应(见第 4.5 节)。
性能与并发注意点
BulkCheckPresence减少重复索引/布隆过滤器命中,提升批量写的吞吐;- 行锁在 Prepare 阶段即获取,Apply 尽量短路径执行,降低锁持有时间;
- Upsert 合并路径避免“先查再写”的重复代价,通过存在性标记直接分派;
- DeltaMemStore 的增量写入使 UPDATE/DELETE 具备更好的随机写特性,后续由 compaction 吸收成本。
WAL 与恢复
写入路径构建 ReplicateMsg(含原始请求、timestamp),Apply 阶段生成 CommitMsg(含操作影响的内存结构位置、TxResultPB)。WAL 持久化顺序:Replicate -> Commit,恢复时:
- 重放 Replicate 生成 in-flight 操作集合。
- 根据 Commit 信息重建插入/变更所处的 RowSets / DeltaStores。
相关:TabletBootstrap 中对历史操作的重放逻辑(非事务路径在 bootstrap 恢复行的应用)。
非事务 vs 事务写差异(简述)
- 非事务:
TabletServiceImpl::Write()直接SubmitWrite();Prepare 允许多种 RowOperations;行锁后立即复制。提交即可见。 - 事务:需要
txn_id。调度器TxnOpDispatcher控制并发;只允许 INSERT 类;需要额外的 partition lock;最终通过事务协调 BEGIN_COMMIT / FINALIZE_COMMIT 序列将 uncommitted rowsets 转换到 committed。本文不深入(后续可单独分析)。
错误与拒绝路径
- 权限不足:NOT_AUTHORIZED。
- Schema 不匹配 / 解码失败:MISMATCHED_SCHEMA。
- 行冲突:ALREADY_PRESENT / NOT_FOUND(行级错误在 per_row_errors 中返回)。
- 队列过载 / 内存压力:THROTTLED。
- 共识领导性检查失败:客户端重试。
- Prepare 队列超时:
ops_timed_out_in_prepare_queue计数 + 超时返回。
小结
非事务写路径由 TabletServiceImpl 入口到 OpDriver 状态机驱动,分离 Prepare/Replicate/Apply/Finish,借助 MVCC + 行锁保证并发与可见性;通过 WAL + CommitMsg 保障数据落盘与恢复。事务特性在非事务路径中被绕过,确保单次写延迟最小化与实现简洁。
附
关键源码索引
| 阶段 | 入口/实现 | 说明 |
|---|---|---|
| RPC | tablet_service.cc:TabletServiceImpl::Write | 请求校验/构造 WriteOpState |
| 提交 | tablet_replica.cc:TabletReplica::SubmitWrite | 创建 Leader OpDriver |
| Prepare | write_op.cc:WriteOp::Prepare / tablet.cc:DecodeWriteOperations | 解码、锁、权限 |
| RowLocks | tablet.cc:AcquireRowLocks / write_op.cc:AcquireRowLocks | 行粒度排他锁 |
| Timestamp | op_driver.cc:Prepare 调 time_manager()->AssignTimestamp | 分配 MVCC 时间戳 |
| Replicate | op_driver.cc:Prepare 调 RaftConsensus::Replicate | Raft 复制开始 |
| ReplicationFinished | op_driver.cc:ReplicationFinished | 进入 Apply 或 Abort |
| Apply | write_op.cc:WriteOp::Apply / tablet.cc:ApplyRowOperations | 实际内存写入 |
| Commit | RaftConsensus::Commit + WAL | WAL 持久化 commit 元数据 |
| Finish | write_op.cc:WriteOp::Finish | MVCC 可见 + metrics |
| MVCC | mvcc.h / Tablet::StartOp / FinishApplyingOrAbort | 状态转换 |
关键实现概要
| |
PBs
| |
| |
关键流程
| |