数据结构
MemRowSet 是 Kudu tablet 中用来暂存新写入数据的内存结构。它的底层存储是一棵并发 B-tree(CBTree),每个叶节点条目存放一个 key-value 对:key 是主键的编码形式(字典序 = 主键逻辑序),value 是一个 MRSRow。
为了支持快照一致性,MemRowSet 从不原地更新已插入的行数据——所有后续变更都以 Mutation 节点的形式挂在行的 redo 链表上,相当于每行自带一条"redo log"。当 MemRowSet 被 flush 到磁盘时,所有内存——包括行数据、Mutation 节点、变长列数据——都从 Arena 中统一释放。
MRSRow
MRSRow 是行在 CBTree 中的存储表示。每个 MRSRow 占据一段连续内存,包含一个定长的 Header 和紧随其后的行数据。根据它的构造函数看出其内存布局如下:
| |
Header 有三个字段:insertion_timestamp 记录行的插入时间戳,供 MVCC 读取时判断可见性;redo_head 和 redo_tail 分别指向该行 Mutation 链表的头和尾。新插入的行 redo_head/redo_tail 均为 nullptr——它没有任何变更历史。每当一次 UPDATE 或 DELETE 到来,就会在链表尾部追加一个新的 Mutation 节点:
| |
维护 redo_tail 指针使得追加操作是 O(1) 的——无需遍历整条链表。
row_data 部分的布局与 ConstContiguousRow 完全相同:先是所有列的定长数据(按 Schema 的列偏移排列),后面跟一个 null 位图。具体来说:
| |
对于 STRING/BINARY 这类变长列,定长部分里存的是一个 16 字节的 Slice(指针 + 长度),实际数据存在 Arena 的其他位置。null_bitmap 中每个 bit 对应一列:bit 置 1 表示该列为 NULL。这种布局使得 MRSRow 和外部传入的 ConstContiguousRow 内存表示完全一致,拷贝时只需一次 memcpy。
如果一行的最后一条 mutation 是 DELETE,这行就成了"幽灵行"(ghost row)。IsGhost() 通过解码 redo_tail 的 RowChangeList 类型来判断。
| |
幽灵行的物理条目仍留在 B-tree 中——如果同一主键再次被 INSERT,无需创建新条目,只需在链表尾部追加一条 REINSERT mutation 即可复活它。
MRSRow 的私有方法 CopyRow 负责将外部传入的行数据复制到自身,使 MemRowSet 不依赖调用者缓冲区的生命周期:
| |
第一步 memcpy 很直接——因为 MRSRow 和 ConstContiguousRow 的行数据内存布局完全一致,逐字节复制即可。但 memcpy 之后有一个问题:STRING/BINARY 列的定长部分里存的 Slice 指针仍然指向调用者的缓冲区(比如 RPC 请求的内存)。如果调用者释放了该缓冲区,这些指针就变成悬空指针。RelocateIndirectDataToArena 解决了这个问题——它做两遍扫描:第一遍统计所有 BINARY 类型列的间接数据总大小;第二遍从 Arena 一次性分配全部所需空间,然后逐列 memcpy 数据并将 Slice 的 data_ 指针重定向到 Arena 上的副本。一次性分配(而非逐列分配)减少了多线程场景下对 Arena 的 CAS 竞争。完成后,MRSRow 的所有数据——无论定长还是变长——都安全地活在 Arena 上,随 MemRowSet 的生命周期统一管理。
Mutation
Mutation 表示对一行数据的一次原子变更(UPDATE、DELETE 或 REINSERT)。每个 Mutation 节点是变长对象——定长头部之后紧跟着可变大小的 RowChangeList 编码数据。由于大小在创建后不会改变,Mutation 通过工厂方法 CreateInArena 一次性从 Arena 分配,不可拷贝、不可移动。内存布局:
| |
| |
CreateInArena 计算总大小 sizeof(Mutation) + changelist 字节数,从 Arena 分配一块字对齐的内存(对齐是为了保证 next_ 指针的原子操作安全),然后 placement new 构造对象,最后 memcpy 拷贝 changelist 数据到尾部的柔性数组。
Mutation 链表的并发安全依赖两层机制。写端(Insert/MutateRow)通过 PreparedMutation 锁定 B-tree 叶节点,保证同一行同时只有一个写操作在修改链表。AppendToListAtomic 使用 Release_Store 语义更新 next_ 和 redo_tail 指针,确保新节点的所有内存写入(timestamp、changelist_data 等)在指针更新之前对其他线程可见。读端(Scanner)通过 acquire_next() 和 acquire_redo_head() 以 Acquire_Load 语义读取链表指针,与写端的 Release 形成 happens-before 关系——读者要么看不到新节点(指针还是旧值),要么看到完整的新节点数据,不会看到半初始化状态。
关键逻辑
CODE: 数据写入 MemRowSet::Insert
MemRowSet 的写入入口是 Insert 方法(memrowset.cc)。一次 Insert 经历五个阶段:主键编码、B-tree 定位与加锁、冲突检测、数据拷贝与插入、WAL 锚定。下面逐一展开。
主键编码。 Insert 接收一个 ConstContiguousRow——即按 Schema 排列的连续内存行。它首先把行中的主键列编码为字典序可比较的二进制串:
| |
编码后的 key 使得 CBTree 的默认 lexicographic comparator 就能正确反映主键的逻辑顺序,无需自定义比较器。
B-tree 定位与加锁。 接下来,Insert 用 PreparedMutation 在 CBTree 中定位目标叶节点。PreparedMutation 是 CBTree 提供的"乐观遍历 + 悲观锁"机制:
| |
Prepare() 从根到叶做乐观遍历(每层检查版本号但不加锁),到达叶节点后才对叶节点做 CAS 加锁。整个作用域内叶节点保持锁定;作用域结束时 PreparedMutation 析构自动解锁。这限制了临界区的范围,同时保证后续对行数据的写入是线程安全的。
冲突检测与 Ghost 行复活。 Prepare() 完成后可通过 exists() 判断 key 是否已在 B-tree 中。如果 key 已存在,Insert 需要区分两种情况:
| |
如果行仍然存活(最近的 mutation 不是 DELETE),返回 AlreadyPresent 错误——这是主键唯一性约束。如果行是 ghost(曾被 DELETE 但物理条目仍留在 B-tree 中),则调用 Reinsert 复活它。这个设计意味着一个主键值在 CBTree 中永远只占一个物理条目,所有历史版本(包括 DELETE 和 REINSERT)都记录在该条目的 Mutation 链表中。
Reinsert 的实现进行单独说明。它不创建新的 B-tree 条目,而是在现有行的 Mutation 链表尾部追加一条 REINSERT 类型的 mutation:
| |
SetToReinsert(row) 将行的所有非主键列编码为 RowChangeList(主键不可变,B-tree 条目中的原始主键仍有效)。编码格式为 [0x03(REINSERT 类型标记)] [col_id_1, value_1] [col_id_2, value_2] ...。然后 CreateInArena 在 Arena 上分配一个变长 Mutation 对象,AppendToListAtomic 用 Release 语义的原子操作将其追加到链表尾部。Reinsert 之后,链表变化如下:
| |
数据拷贝与插入。 回到 Insert 的主路径——key 不存在的情况。Insert 在栈上构造一个临时 MRSRow,填入 Header 和行数据,然后通过 mutation.Insert() 将其放入 B-tree:
| |
DEFINE_MRSROW_ON_STACK 宏在栈上分配 sizeof(Header) + row_size(schema) 字节,构造 MRSRow 指向该栈内存。Header 的 insertion_timestamp 记录写入时间戳,redo_head/redo_tail 初始化为空——新行没有任何变更历史。
CopyRow 做两件事:
memcpy 将输入行数据复制到栈上的 MRSRow;
将 STRING/BINARY 等变长列的间接数据(原来的指针指向调用者的缓冲区)重新分配到 Arena 并更新指针。这确保 MemRowSet 不依赖外部缓冲区的生命周期。
最后 mutation.Insert(mrsrow_slice) 将栈上数据复制到 CBTree 的 Arena 存储中,作为叶节点的 value。因为 Prepare() 已确认 key 不存在且叶节点已加锁,这里的插入必定成功。
WAL 锚定与统计更新。 作用域结束后叶节点解锁,Insert 在临界区之外完成剩余工作:
| |
AnchorIfMinimum 记录此操作的 WAL log index。LogAnchor 机制确保该 index 之前的 WAL segment 不会被 GC——只要 MemRowSet 未被 flush,它引用的最早的 WAL 条目就不会被回收。live_row_count_ 是存活行的精确计数,用于后续 flush 决策和 tablet 统计。
CODE: 数据更新 MemRowSet::MutateRow
MutateRow 处理对已有行的 UPDATE 和 DELETE 操作(memrowset.cc)。与 Insert 相比,MutateRow 更简单:它不需要编码行数据,只需定位行、追加一个 Mutation 节点。
定位行。 MutateRow 接收一个 RowSetKeyProbe 而非原始行。RowSetKeyProbe 是一个缓存结构,构造时一次性完成主键编码和 Bloom Filter 探测的预计算,供多个 RowSet 复用。MutateRow 用其中的 encoded_key_slice() 在 CBTree 中查找:
| |
与 Insert 一样,通过 PreparedMutation 定位并锁定叶节点。如果 key 不存在,返回 NotFound——调用层会继续在其他 RowSet(DiskRowSet)中查找。
Ghost 行检查。 key 存在并不意味着行是活跃的:
| |
IsGhost() 解码 redo 链表尾节点的 RowChangeList,判断最后一条 mutation 是否为 DELETE。如果是 ghost 行同样返回 NotFound。
追加 Mutation。 行存活时,将变更编码为 Mutation 并追加到链表尾部:
| |
delta 参数是一个已编码好的 RowChangeList(由调用层准备),可能是 UPDATE(部分列修改)或 DELETE。CreateInArena 在 Arena 上分配 sizeof(Mutation) + changelist 字节数 的连续内存,placement new 构造对象,memcpy 拷贝 changelist 数据。AppendToListAtomic 使用 Release 语义的原子存储把新节点挂到链表尾部——这确保并发读者通过 Acquire_Load 读取链表时,一定能看到新 Mutation 的完整数据。
记录结果和善后。 在临界区内,MutateRow 还在 OperationResultPB 中记录变更发生在哪个 MRS:
| |
这个信息会随 RPC 响应返回给客户端和 WAL,用于后续的副本同步和一致性校验。临界区结束后,类似 Insert,MutateRow 锚定 WAL 并更新统计计数。如果变更是 DELETE,存活行计数减一:
| |
注意 MutateRow 对 DELETE 的处理与 Insert 对 ghost 行的处理形成对称:DELETE 把 live_row_count_ 减一并留下 ghost 条目,后续的 Insert(如果同一 key 再次写入)检测到 ghost 行后 Reinsert 再把计数加回来。这种设计避免了从 B-tree 物理删除条目的开销——物理清理推迟到 MemRowSet flush 时统一处理。
CODE: 数据读取 Iterator
MemRowSet::Iterator 是 MemRowSet 的读取路径,负责按主键顺序遍历 B-tree 中的行,把每行投影为客户端请求的列子集,并根据 MVCC 快照将 redo 链表中的变更"回放"到行数据上,最终输出某一时间点的行快照。
创建与初始化。 Iterator 由 MemRowSet::NewIterator(opts) 创建。构造函数接收三个参数:MRS 的 shared_ptr(保证迭代期间 MRS 不被析构)、CBTree 的底层迭代器、以及包含 projection Schema 和 MVCC 快照的扫描选项。构造时立即创建行投影器(projector)和 delta 投影器(delta_projector),并将 B-tree 迭代器定位到起始位置。
| |
Init(ScanSpec*) 完成第二阶段初始化:如果 ScanSpec 指定了扫描下界(如 WHERE key >= X),将 B-tree 迭代器 Seek 到不小于该下界的第一行;如果指定了上界,保存到 exclusive_upper_bound_ 供后续每行检查时用。之后将状态从 kUninitialized 切换为 kScanning。
逐批读取:NextBlock 与 FetchRows。 上层 Scanner 通过反复调用 NextBlock(RowBlock*) 批量读取行。
| |
NextBlock 是一个薄包装:它把 RowBlock 大小设为最大容量,重置其 Arena(释放上一批变长列数据的临时内存),设置 SelectionVector 全部选中,然后委托给 FetchRows 执行真正的工作。
FetchRows 是一个 do-while 循环,每次迭代处理一行:
| |
循环先从 B-tree 迭代器取出当前行的 value Slice,构造 MRSRow。然后检查主键是否超过上界——如果是,标记 kFinished 并退出。接下来进入 MVCC 过滤的核心逻辑。
MVCC 时间范围过滤。 opts_ 中的两个快照定义了迭代器的时间窗口:
- 普通扫描(只设
snap_to_include):时间范围 = (-∞, snap_to_include),返回该快照时刻之前已提交的所有行。 - 增量扫描(同时设
snap_to_exclude和snap_to_include):时间范围 = [snap_to_exclude, snap_to_include),只返回两个快照之间的差异数据,用于增量备份或 compaction。
对每一行,FetchRows 首先判断其 INSERT 是否在时间窗口内可见。如果行的 insertion_timestamp 已被 snap_to_include 确认(即 IsApplied 返回 true),或者虽已被 snap_to_exclude 排除但后续 mutation 可能仍在窗口内,则需要处理该行。FetchRows 先调用 projector 将 MRS 全列行投影到 dst_row,然后以 Acquire_Load 读取 redo_head 指针,交给 ApplyMutationsToProjectedRow 回放变更。
如果行的 INSERT 太新(snap_to_include 都看不到),整行被跳过——在 SelectionVector 中标记为未选中。
| |
Mutation 回放:ApplyMutationsToProjectedRow。 这是整个读取路径中最复杂的方法。它遍历行的 redo mutation 链表,将时间窗口内的变更逐个应用到已投影的目标行上,同时维护一个存活状态机来检测"不可观测行"。
短路路径很简单:如果 mutation_head == nullptr,即行从未被修改过,返回 NONE_APPLIED。
对于有 mutation 的行,方法用两个布尔变量 is_deleted_start 和 is_deleted_end 跟踪行在时间窗口两端的存活状态:
is_deleted_end的更新很直观——遇到 DELETE 翻转为 true,遇到 REINSERT 翻转为 false。is_deleted_start的确定则比较 tricky:如果 INSERT 在窗口内,行在窗口起点之前不存在,等价于已删除,初始化为 true;如果 INSERT 在窗口之前,则延迟到遇到第一个窗口内 mutation 时,用当时is_deleted_end的值(反映了窗口起点时所有旧 mutation 的累积效果)来修正。
主循环通过 acquire_next() 遍历 mutation 链表(与写端的 Release_Store 形成 happens-before 关系),对每个 mutation 执行以下步骤:
时间上界裁剪:如果 mutation 的 timestamp 超过
snap_to_include,后续所有 mutation 也必然更新(链表按时间戳单调递增),直接 break。窗口内判断与状态标记:如果 mutation 在 [snap_to_exclude, snap_to_include) 窗口内,标记
local_apply_status = APPLIED_AND_PRESENT。首次进入窗口时还要修正is_deleted_start。应用变更:对 DELETE,只翻转
is_deleted_end;对 UPDATE 或 REINSERT,逐列将变更应用到dst_row。REINSERT 额外翻转is_deleted_end(ghost → 活跃)。
| |
在逐列应用变更的实现中:对于每个投影列,方法创建一个新的 RowChangeListDecoder(因为 decoder 是单次遍历的),在 changelist 中查找该列的变更并写入 dst_row 的对应位置。这意味着每个 mutation 对每个投影列都做一次 changelist 扫描——时间复杂度 O(投影列数 × changelist 中的列数)。这里实际上可以通过反向索引的方式优化时间复杂度为 O(n)。
| |
循环结束后,方法根据 is_deleted_start 和 is_deleted_end 确定最终的 ApplyStatus:
| |
APPLIED_AND_UNOBSERVABLE:行在窗口两端都是死的——它的整个生命周期是窗口的子集,外部观察者在任何一端都看不到它存活,应从结果中排除。APPLIED_AND_DELETED:行在窗口终点已被删除,但窗口起点时是存活的。是否返回取决于opts_.include_deleted_rows。NONE_APPLIED:没有窗口内的 mutation 被应用。若insert_excluded为 true(INSERT 太旧且无窗口内变更),该行也应排除。
回到 FetchRows,它根据 ApplyStatus 决定是否在 SelectionVector 中取消选中该行:
| |
如果查询请求了 IS_DELETED 虚拟列,则在选中行的该列位置写入 apply_status == APPLIED_AND_DELETED 的布尔值。
Flush/Compaction 路径:GetCurrentRow(五参数重载)。 与面向客户端的 FetchRows 路径不同,Flush/Compaction 使用五参数版本的 GetCurrentRow。它不做 MVCC roll-forward,而是将行的基线数据和 redo mutation 链表分别输出,由下游的 DiskRowSet 分别写入 base data 和 delta store。
如果 MRS Schema 与 projection Schema 不同(!delta_projector_.is_identity()),需要对 redo 链表中每个 mutation 的 changelist 做列投影——保留 projection 中存在的列变更,丢弃不存在的。投影后为空的 mutation(所有变更列都不在 projection 中)直接跳过。投影后的 mutation 节点从 mutation_arena 分配,组成一条全新的链表返回给调用者。
附录
相关源码文件索引
| 文件 | |
|---|---|
src/kudu/tablet/memrowset.h | MemRowSet、MRSRow、Iterator 类定义 |
src/kudu/tablet/memrowset.cc | Insert、MutateRow、FetchRows、ApplyMutations 实现 |
src/kudu/tablet/concurrent_btree.h | CBTree、LeafNode、InternalNode、PreparedMutation、CBTreeIterator |
src/kudu/tablet/mutation.h | Mutation 类(变长对象、Arena 分配、链表操作) |
src/kudu/tablet/mutation.cc | AppendToListAtomic、StringifyMutationList |
src/kudu/util/memory/arena.h | ArenaBase、ThreadSafeMemoryTrackingArena |
src/kudu/util/memory/memory.h | MemoryTrackingBufferAllocator、BufferAllocator |
src/kudu/tablet/rowset.h | RowSetKeyProbe、ProbeStats |
src/kudu/common/schema.h | Schema::EncodeComparableKey |
src/kudu/tablet/memrowset-test.cc | 单元测试 |
针对 memrowset 尝试的优化
- [Merged][tablet] optimize MemRowSet scan to single-pass changelist application https://gerrit.cloudera.org/c/24158/