数据结构

MemRowSet 是 Kudu tablet 中用来暂存新写入数据的内存结构。它的底层存储是一棵并发 B-tree(CBTree),每个叶节点条目存放一个 key-value 对:key 是主键的编码形式(字典序 = 主键逻辑序),value 是一个 MRSRow。

为了支持快照一致性,MemRowSet 从不原地更新已插入的行数据——所有后续变更都以 Mutation 节点的形式挂在行的 redo 链表上,相当于每行自带一条"redo log"。当 MemRowSet 被 flush 到磁盘时,所有内存——包括行数据、Mutation 节点、变长列数据——都从 Arena 中统一释放。

MRSRow

MRSRow 是行在 CBTree 中的存储表示。每个 MRSRow 占据一段连续内存,包含一个定长的 Header 和紧随其后的行数据。根据它的构造函数看出其内存布局如下:

1
2
3
4
5
传入的 Slice s 指向的连续内存:
|<---------- s.size() ---------->|
|  Header (24B)  |   row_data   |
   ↑                   ↑
   header_              row_slice_ (remove_prefix 后)

Header 有三个字段:insertion_timestamp 记录行的插入时间戳,供 MVCC 读取时判断可见性;redo_headredo_tail 分别指向该行 Mutation 链表的头和尾。新插入的行 redo_head/redo_tail 均为 nullptr——它没有任何变更历史。每当一次 UPDATE 或 DELETE 到来,就会在链表尾部追加一个新的 Mutation 节点:

1
2
3
4
5
6
MRSRow::Header
  ├── redo_head ──→ Mutation(t=10, UPDATE col1=5)
  │                   └── next_ ──→ Mutation(t=20, UPDATE col2=8)
  │                                   └── next_ ──→ Mutation(t=30, DELETE)
  │                                                   └── next_ = nullptr
  └── redo_tail ──→ Mutation(t=30, DELETE)  // 直接指向尾节点

维护 redo_tail 指针使得追加操作是 O(1) 的——无需遍历整条链表。

row_data 部分的布局与 ConstContiguousRow 完全相同:先是所有列的定长数据(按 Schema 的列偏移排列),后面跟一个 null 位图。具体来说:

1
2
3
row_data 内存布局:
|<-- schema.byte_size() -->|<-- BitmapSize(num_cols) -->|
|  col0 | col1 | ... | colN |        null_bitmap        |

对于 STRING/BINARY 这类变长列,定长部分里存的是一个 16 字节的 Slice(指针 + 长度),实际数据存在 Arena 的其他位置。null_bitmap 中每个 bit 对应一列:bit 置 1 表示该列为 NULL。这种布局使得 MRSRow 和外部传入的 ConstContiguousRow 内存表示完全一致,拷贝时只需一次 memcpy。

如果一行的最后一条 mutation 是 DELETE,这行就成了"幽灵行"(ghost row)。IsGhost() 通过解码 redo_tail 的 RowChangeList 类型来判断。

1
2
3
4
5
6
7
8
bool MRSRow::IsGhost() const {
  const Mutation *mut_tail = header_->redo_tail;
  // ...
  RowChangeListDecoder decoder(mut_tail->changelist());
  Status s = decoder.Init();
  // ...
  return decoder.is_delete();
}

幽灵行的物理条目仍留在 B-tree 中——如果同一主键再次被 INSERT,无需创建新条目,只需在链表尾部追加一条 REINSERT mutation 即可复活它。

MRSRow 的私有方法 CopyRow 负责将外部传入的行数据复制到自身,使 MemRowSet 不依赖调用者缓冲区的生命周期:

1
2
3
4
Status CopyRow(const ConstContiguousRow& row, ArenaType *arena) {
  memcpy(row_slice_.mutable_data(), row.row_data(), row_slice_.size());
  return kudu::RelocateIndirectDataToArena(this, arena);
}

第一步 memcpy 很直接——因为 MRSRow 和 ConstContiguousRow 的行数据内存布局完全一致,逐字节复制即可。但 memcpy 之后有一个问题:STRING/BINARY 列的定长部分里存的 Slice 指针仍然指向调用者的缓冲区(比如 RPC 请求的内存)。如果调用者释放了该缓冲区,这些指针就变成悬空指针。RelocateIndirectDataToArena 解决了这个问题——它做两遍扫描:第一遍统计所有 BINARY 类型列的间接数据总大小;第二遍从 Arena 一次性分配全部所需空间,然后逐列 memcpy 数据并将 Slice 的 data_ 指针重定向到 Arena 上的副本。一次性分配(而非逐列分配)减少了多线程场景下对 Arena 的 CAS 竞争。完成后,MRSRow 的所有数据——无论定长还是变长——都安全地活在 Arena 上,随 MemRowSet 的生命周期统一管理。

Mutation

Mutation 表示对一行数据的一次原子变更(UPDATE、DELETE 或 REINSERT)。每个 Mutation 节点是变长对象——定长头部之后紧跟着可变大小的 RowChangeList 编码数据。由于大小在创建后不会改变,Mutation 通过工厂方法 CreateInArena 一次性从 Arena 分配,不可拷贝、不可移动。内存布局:

1
2
3
4
5
6
class Mutation {
    Timestamp timestamp_;          // 8 bytes — 此变更的操作时间戳
    Mutation *next_;               // 8 bytes — 下一个变更节点
    uint32_t changelist_size_;     // 4 bytes — RowChangeList 数据长度
    char changelist_data_[0];      // 变长   — 实际的 RowChangeList 编码数据
};
1
2
3
4
5
6
7
Arena 分配的 Mutation 对象内存布局(连续分配):
                    sizeof(Mutation)                  changelist_size_
              ◄──────────────────────────►     ◄─────────────────────────►
┌───────────┬──────────┬─────────────────┬─────────────────────────────────┐
│ timestamp │  next_   │ changelist_size_│      changelist_data_[]         │
│   8 bytes │ 8 bytes  │    4 bytes      │      ( N bytes)                 │
└───────────┴──────────┴─────────────────┴─────────────────────────────────┘

CreateInArena 计算总大小 sizeof(Mutation) + changelist 字节数,从 Arena 分配一块字对齐的内存(对齐是为了保证 next_ 指针的原子操作安全),然后 placement new 构造对象,最后 memcpy 拷贝 changelist 数据到尾部的柔性数组。

Mutation 链表的并发安全依赖两层机制。写端(Insert/MutateRow)通过 PreparedMutation 锁定 B-tree 叶节点,保证同一行同时只有一个写操作在修改链表。AppendToListAtomic 使用 Release_Store 语义更新 next_redo_tail 指针,确保新节点的所有内存写入(timestamp、changelist_data 等)在指针更新之前对其他线程可见。读端(Scanner)通过 acquire_next()acquire_redo_head()Acquire_Load 语义读取链表指针,与写端的 Release 形成 happens-before 关系——读者要么看不到新节点(指针还是旧值),要么看到完整的新节点数据,不会看到半初始化状态。

关键逻辑

CODE: 数据写入 MemRowSet::Insert

MemRowSet 的写入入口是 Insert 方法(memrowset.cc)。一次 Insert 经历五个阶段:主键编码、B-tree 定位与加锁、冲突检测、数据拷贝与插入、WAL 锚定。下面逐一展开。

主键编码。 Insert 接收一个 ConstContiguousRow——即按 Schema 排列的连续内存行。它首先把行中的主键列编码为字典序可比较的二进制串:

1
2
3
faststring enc_key_buf;
schema_.EncodeComparableKey(row, &enc_key_buf);
Slice enc_key(enc_key_buf);

编码后的 key 使得 CBTree 的默认 lexicographic comparator 就能正确反映主键的逻辑顺序,无需自定义比较器。

B-tree 定位与加锁。 接下来,Insert 用 PreparedMutation 在 CBTree 中定位目标叶节点。PreparedMutation 是 CBTree 提供的"乐观遍历 + 悲观锁"机制:

1
2
btree::PreparedMutation<MSBTreeTraits> mutation(enc_key);
mutation.Prepare(&tree_);

Prepare() 从根到叶做乐观遍历(每层检查版本号但不加锁),到达叶节点后才对叶节点做 CAS 加锁。整个作用域内叶节点保持锁定;作用域结束时 PreparedMutation 析构自动解锁。这限制了临界区的范围,同时保证后续对行数据的写入是线程安全的。

冲突检测与 Ghost 行复活。 Prepare() 完成后可通过 exists() 判断 key 是否已在 B-tree 中。如果 key 已存在,Insert 需要区分两种情况:

1
2
3
4
5
6
7
if (mutation.exists()) {
    MRSRow ms_row(this, mutation.current_mutable_value());
    if (!ms_row.IsGhost()) {
        return Status::AlreadyPresent("key already present");
    }
    return Reinsert(timestamp, row, &ms_row);
}

如果行仍然存活(最近的 mutation 不是 DELETE),返回 AlreadyPresent 错误——这是主键唯一性约束。如果行是 ghost(曾被 DELETE 但物理条目仍留在 B-tree 中),则调用 Reinsert 复活它。这个设计意味着一个主键值在 CBTree 中永远只占一个物理条目,所有历史版本(包括 DELETE 和 REINSERT)都记录在该条目的 Mutation 链表中。

Reinsert 的实现进行单独说明。它不创建新的 B-tree 条目,而是在现有行的 Mutation 链表尾部追加一条 REINSERT 类型的 mutation:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Status MemRowSet::Reinsert(Timestamp timestamp,
                           const ConstContiguousRow& row, MRSRow *ms_row) {
    faststring buf;
    RowChangeListEncoder encoder(&buf);
    encoder.SetToReinsert(row);
    Mutation *mut = Mutation::CreateInArena(arena_.get(), timestamp,
                                           encoder.as_changelist());
    mut->AppendToListAtomic(&ms_row->header_->redo_head,
                            &ms_row->header_->redo_tail);
    live_row_count_.Increment();
    return Status::OK();
}

SetToReinsert(row) 将行的所有非主键列编码为 RowChangeList(主键不可变,B-tree 条目中的原始主键仍有效)。编码格式为 [0x03(REINSERT 类型标记)] [col_id_1, value_1] [col_id_2, value_2] ...。然后 CreateInArena 在 Arena 上分配一个变长 Mutation 对象,AppendToListAtomic 用 Release 语义的原子操作将其追加到链表尾部。Reinsert 之后,链表变化如下:

1
2
3
4
5
6
7
Reinsert 前(ghost 行):
  redo_head ──→ [DELETE@T1] ──→ NULL
  redo_tail ──→ [DELETE@T1]

Reinsert 后(行复活):
  redo_head ──→ [DELETE@T1] ──→ [REINSERT@T2(全部非 key 列数据)] ──→ NULL
  redo_tail ──→ [REINSERT@T2]

数据拷贝与插入。 回到 Insert 的主路径——key 不存在的情况。Insert 在栈上构造一个临时 MRSRow,填入 Header 和行数据,然后通过 mutation.Insert() 将其放入 B-tree:

1
2
3
4
5
6
DEFINE_MRSROW_ON_STACK(this, mrsrow, mrsrow_slice);
mrsrow.header_->insertion_timestamp = timestamp;
mrsrow.header_->redo_head = nullptr;
mrsrow.header_->redo_tail = nullptr;
RETURN_NOT_OK(mrsrow.CopyRow(row, arena_.get()));
CHECK(mutation.Insert(mrsrow_slice));

DEFINE_MRSROW_ON_STACK 宏在栈上分配 sizeof(Header) + row_size(schema) 字节,构造 MRSRow 指向该栈内存。Header 的 insertion_timestamp 记录写入时间戳,redo_head/redo_tail 初始化为空——新行没有任何变更历史。

CopyRow 做两件事:

  1. memcpy 将输入行数据复制到栈上的 MRSRow;

  2. 将 STRING/BINARY 等变长列的间接数据(原来的指针指向调用者的缓冲区)重新分配到 Arena 并更新指针。这确保 MemRowSet 不依赖外部缓冲区的生命周期。

最后 mutation.Insert(mrsrow_slice) 将栈上数据复制到 CBTree 的 Arena 存储中,作为叶节点的 value。因为 Prepare() 已确认 key 不存在且叶节点已加锁,这里的插入必定成功。

WAL 锚定与统计更新。 作用域结束后叶节点解锁,Insert 在临界区之外完成剩余工作:

1
2
3
RETURN_NOT_OK(anchorer_.AnchorIfMinimum(op_id.index()));
debug_insert_count_++;
live_row_count_.Increment();

AnchorIfMinimum 记录此操作的 WAL log index。LogAnchor 机制确保该 index 之前的 WAL segment 不会被 GC——只要 MemRowSet 未被 flush,它引用的最早的 WAL 条目就不会被回收。live_row_count_ 是存活行的精确计数,用于后续 flush 决策和 tablet 统计。

CODE: 数据更新 MemRowSet::MutateRow

MutateRow 处理对已有行的 UPDATE 和 DELETE 操作(memrowset.cc)。与 Insert 相比,MutateRow 更简单:它不需要编码行数据,只需定位行、追加一个 Mutation 节点。

定位行。 MutateRow 接收一个 RowSetKeyProbe 而非原始行。RowSetKeyProbe 是一个缓存结构,构造时一次性完成主键编码和 Bloom Filter 探测的预计算,供多个 RowSet 复用。MutateRow 用其中的 encoded_key_slice() 在 CBTree 中查找:

1
2
3
4
5
btree::PreparedMutation<MSBTreeTraits> mutation(probe.encoded_key_slice());
mutation.Prepare(&tree_);
if (!mutation.exists()) {
    return Status::NotFound("not in memrowset");
}

与 Insert 一样,通过 PreparedMutation 定位并锁定叶节点。如果 key 不存在,返回 NotFound——调用层会继续在其他 RowSet(DiskRowSet)中查找。

Ghost 行检查。 key 存在并不意味着行是活跃的:

1
2
3
4
MRSRow row(this, mutation.current_mutable_value());
if (row.IsGhost()) {
    return Status::NotFound("not in memrowset (ghost)");
}

IsGhost() 解码 redo 链表尾节点的 RowChangeList,判断最后一条 mutation 是否为 DELETE。如果是 ghost 行同样返回 NotFound

追加 Mutation。 行存活时,将变更编码为 Mutation 并追加到链表尾部:

1
2
Mutation *mut = Mutation::CreateInArena(arena_.get(), timestamp, delta);
mut->AppendToListAtomic(&row.header_->redo_head, &row.header_->redo_tail);

delta 参数是一个已编码好的 RowChangeList(由调用层准备),可能是 UPDATE(部分列修改)或 DELETE。CreateInArena 在 Arena 上分配 sizeof(Mutation) + changelist 字节数 的连续内存,placement new 构造对象,memcpy 拷贝 changelist 数据。AppendToListAtomic 使用 Release 语义的原子存储把新节点挂到链表尾部——这确保并发读者通过 Acquire_Load 读取链表时,一定能看到新 Mutation 的完整数据。

记录结果和善后。 在临界区内,MutateRow 还在 OperationResultPB 中记录变更发生在哪个 MRS:

1
2
MemStoreTargetPB* target = result->add_mutated_stores();
target->set_mrs_id(id_);

这个信息会随 RPC 响应返回给客户端和 WAL,用于后续的副本同步和一致性校验。临界区结束后,类似 Insert,MutateRow 锚定 WAL 并更新统计计数。如果变更是 DELETE,存活行计数减一:

1
2
3
4
5
RETURN_NOT_OK(anchorer_.AnchorIfMinimum(op_id.index()));
debug_update_count_++;
if (delta.is_delete()) {
    live_row_count_.IncrementBy(-1);
}

注意 MutateRow 对 DELETE 的处理与 Insert 对 ghost 行的处理形成对称:DELETE 把 live_row_count_ 减一并留下 ghost 条目,后续的 Insert(如果同一 key 再次写入)检测到 ghost 行后 Reinsert 再把计数加回来。这种设计避免了从 B-tree 物理删除条目的开销——物理清理推迟到 MemRowSet flush 时统一处理。

CODE: 数据读取 Iterator

MemRowSet::Iterator 是 MemRowSet 的读取路径,负责按主键顺序遍历 B-tree 中的行,把每行投影为客户端请求的列子集,并根据 MVCC 快照将 redo 链表中的变更"回放"到行数据上,最终输出某一时间点的行快照。

创建与初始化。 Iterator 由 MemRowSet::NewIterator(opts) 创建。构造函数接收三个参数:MRS 的 shared_ptr(保证迭代期间 MRS 不被析构)、CBTree 的底层迭代器、以及包含 projection Schema 和 MVCC 快照的扫描选项。构造时立即创建行投影器(projector)和 delta 投影器(delta_projector),并将 B-tree 迭代器定位到起始位置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
MemRowSet::Iterator::Iterator(const shared_ptr<const MemRowSet>& mrs,
                              MemRowSet::MSBTIter* iter,
                              RowIteratorOptions opts)
    : memrowset_(mrs), iter_(iter), opts_(std::move(opts)),
      projector_(GenerateAppropriateProjector(&mrs->schema_nonvirtual(),
                                              opts_.projection)),
      delta_projector_(&mrs->schema_nonvirtual(), opts_.projection),
      ...
      state_(kUninitialized) {
  iter_->SeekToStart();
}

Init(ScanSpec*) 完成第二阶段初始化:如果 ScanSpec 指定了扫描下界(如 WHERE key >= X),将 B-tree 迭代器 Seek 到不小于该下界的第一行;如果指定了上界,保存到 exclusive_upper_bound_ 供后续每行检查时用。之后将状态从 kUninitialized 切换为 kScanning

逐批读取:NextBlock 与 FetchRows。 上层 Scanner 通过反复调用 NextBlock(RowBlock*) 批量读取行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Status MemRowSet::Iterator::NextBlock(RowBlock *dst) {
  if (PREDICT_FALSE(!iter_->IsValid())) {
    dst->Resize(0);
    return Status::NotFound("end of iter");
  }

  if (PREDICT_FALSE(state_ != kScanning)) {
    dst->Resize(0);
    return Status::OK();
  }

  if (PREDICT_FALSE(dst->row_capacity() == 0)) {
    return Status::OK();
  }

  dst->Resize(dst->row_capacity());
  if (dst->arena()) {
    dst->arena()->Reset();
  }

  dst->selection_vector()->SetAllTrue();
  size_t fetched;

  RETURN_NOT_OK(FetchRows(dst, &fetched));

  dst->Resize(fetched);

  return Status::OK();
}

NextBlock 是一个薄包装:它把 RowBlock 大小设为最大容量,重置其 Arena(释放上一批变长列数据的临时内存),设置 SelectionVector 全部选中,然后委托给 FetchRows 执行真正的工作。

FetchRows 是一个 do-while 循环,每次迭代处理一行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
do {
    RowBlockRow dst_row = dst->row(*fetched);
    MRSRow row(memrowset_.get(), iter_->GetCurrentValue());

    if (has_upper_bound() && out_of_bounds(iter_->GetCurrentKey())) {
        state_ = kFinished;
        break;
    }
    // ... MVCC 过滤 + 投影 + mutation 回放 ...
    ++*fetched;
} while (iter_->Next() && *fetched < dst->nrows());

循环先从 B-tree 迭代器取出当前行的 value Slice,构造 MRSRow。然后检查主键是否超过上界——如果是,标记 kFinished 并退出。接下来进入 MVCC 过滤的核心逻辑。

MVCC 时间范围过滤。 opts_ 中的两个快照定义了迭代器的时间窗口:

  • 普通扫描(只设 snap_to_include):时间范围 = (-∞, snap_to_include),返回该快照时刻之前已提交的所有行。
  • 增量扫描(同时设 snap_to_excludesnap_to_include):时间范围 = [snap_to_exclude, snap_to_include),只返回两个快照之间的差异数据,用于增量备份或 compaction。

对每一行,FetchRows 首先判断其 INSERT 是否在时间窗口内可见。如果行的 insertion_timestamp 已被 snap_to_include 确认(即 IsApplied 返回 true),或者虽已被 snap_to_exclude 排除但后续 mutation 可能仍在窗口内,则需要处理该行。FetchRows 先调用 projector 将 MRS 全列行投影到 dst_row,然后以 Acquire_Load 读取 redo_head 指针,交给 ApplyMutationsToProjectedRow 回放变更。

如果行的 INSERT 太新(snap_to_include 都看不到),整行被跳过——在 SelectionVector 中标记为未选中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    // The snapshots in 'opts_' represent a time range that this iterator must
    // respect. There are two possible cases:
    //
    // 1. 'snap_to_exclude' is unset but 'snap_to_include' is set. The time
    //    range is [INF, snap_to_include).
    // 2. Both 'snap_to exclude' and 'snap_to_include' are set. The time range
    //    is [snap_to_exclude, snap_to_include).
    //
    // If a non-transactional  row's insertion timestamp is applied in
    // 'snap_to_exclude' or a transactional row's commit timestamp is committed
    // in 'snap_to_exclude', the insertion was outside this iterator's time
    // range (i.e. the insert was "excluded").  However, subsequent mutations
    // may be inside the time range, so we must still project the row and walk
    // its mutation list.
    const auto& txn_meta = memrowset_->txn_metadata();
    bool insert_excluded = opts_.snap_to_exclude &&
        // TODO(awong): if we find this to be too slow, we should be able to
        // compute IsCommitted() once per iterator at construction time.
        (txn_meta ? opts_.snap_to_exclude->IsCommitted(*txn_meta.get()) :
                    opts_.snap_to_exclude->IsApplied(row.insertion_timestamp()));
    bool unset_in_sel_vector;
    ApplyStatus apply_status;
    if (insert_excluded ||
        (txn_meta ? opts_.snap_to_include.IsCommitted(*txn_meta.get()) :
                    opts_.snap_to_include.IsApplied(row.insertion_timestamp()))) {
      RETURN_NOT_OK(projector_->ProjectRowForRead(row, &dst_row, dst->arena()));

      // Roll-forward MVCC for committed updates.
      Mutation* redo_head = reinterpret_cast<Mutation*>(
          base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&row.header_->redo_head)));
      RETURN_NOT_OK(ApplyMutationsToProjectedRow(
          redo_head, &dst_row, dst->arena(), insert_excluded, &apply_status));
      unset_in_sel_vector = (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
                            (apply_status == NONE_APPLIED && insert_excluded) ||
                            (apply_status == APPLIED_AND_UNOBSERVABLE);
    } else {
      // The insertion is too new; the entire row should be omitted.
      unset_in_sel_vector = true;
    }

Mutation 回放:ApplyMutationsToProjectedRow。 这是整个读取路径中最复杂的方法。它遍历行的 redo mutation 链表,将时间窗口内的变更逐个应用到已投影的目标行上,同时维护一个存活状态机来检测"不可观测行"。

短路路径很简单:如果 mutation_head == nullptr,即行从未被修改过,返回 NONE_APPLIED

对于有 mutation 的行,方法用两个布尔变量 is_deleted_startis_deleted_end 跟踪行在时间窗口两端的存活状态:

  • is_deleted_end 的更新很直观——遇到 DELETE 翻转为 true,遇到 REINSERT 翻转为 false。

  • is_deleted_start 的确定则比较 tricky:如果 INSERT 在窗口内,行在窗口起点之前不存在,等价于已删除,初始化为 true;如果 INSERT 在窗口之前,则延迟到遇到第一个窗口内 mutation 时,用当时 is_deleted_end 的值(反映了窗口起点时所有旧 mutation 的累积效果)来修正。

主循环通过 acquire_next() 遍历 mutation 链表(与写端的 Release_Store 形成 happens-before 关系),对每个 mutation 执行以下步骤:

  1. 时间上界裁剪:如果 mutation 的 timestamp 超过 snap_to_include,后续所有 mutation 也必然更新(链表按时间戳单调递增),直接 break。

  2. 窗口内判断与状态标记:如果 mutation 在 [snap_to_exclude, snap_to_include) 窗口内,标记 local_apply_status = APPLIED_AND_PRESENT。首次进入窗口时还要修正 is_deleted_start

  3. 应用变更:对 DELETE,只翻转 is_deleted_end;对 UPDATE 或 REINSERT,逐列将变更应用到 dst_row。REINSERT 额外翻转 is_deleted_end(ghost → 活跃)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  // In order to find unobservable rows, we need to track row liveness at the
  // start and end of the time range. If a row was dead at both ends, its
  // lifespan must have been a subset of the time range and it should be
  // excluded from the results.
  //
  // Finding 'is_deleted_end' is relatively straight-forward: we use each
  // relevant mutation to drive a liveness state machine, and after we're done
  // applying, 'is_deleted_end' is just the final value of that state machine.
  //
  // Finding 'is_deleted_start' is trickier. If the insertion was inside the
  // time range, we know the value is true because the row was dead prior to the
  // insertion and the insertion happened after the start of the time range.
  // However, if the insertion was excluded from the time range, the value is
  // going to be whatever the value of the liveness state machine was at the
  // start of the time range.
  bool is_deleted_start = !insert_excluded;
  bool is_deleted_end = false;

  for (const Mutation *mut = mutation_head;
       mut != nullptr;
       mut = mut->acquire_next()) {
    if (!opts_.snap_to_include.IsApplied(mut->timestamp_)) {
      // This mutation is too new and should be omitted.
      //
      // All subsequent mutations are also too new because their timestamps are
      // guaranteed to be equal to or greater than this mutation's timestamp.
      break;
    }

    // If the mutation is too old, we still need to apply it (so that the column
    // values are correct if we see a relevant mutation later), but it doesn't
    // count towards the overall "application status".
    if (!opts_.snap_to_exclude ||
        !opts_.snap_to_exclude->IsApplied(mut->timestamp_)) {

      // This is the first mutation within the time range, so we may use it to
      // initialize 'is_deleted_start'.
      if (local_apply_status == NONE_APPLIED && insert_excluded) {
        is_deleted_start = is_deleted_end;
      }
      local_apply_status = APPLIED_AND_PRESENT;
    }

在逐列应用变更的实现中:对于每个投影列,方法创建一个新的 RowChangeListDecoder(因为 decoder 是单次遍历的),在 changelist 中查找该列的变更并写入 dst_row 的对应位置。这意味着每个 mutation 对每个投影列都做一次 changelist 扫描——时间复杂度 O(投影列数 × changelist 中的列数)。这里实际上可以通过反向索引的方式优化时间复杂度为 O(n)

1
2
3
4
5
6
7
8
      for (const RowProjector::ProjectionIdxMapping& mapping : projector_->base_cols_mapping()) {
        RowChangeListDecoder decoder(mut->changelist());
        RETURN_NOT_OK(decoder.Init());
        ColumnBlock dst_col = dst_row->column_block(mapping.first);
        RETURN_NOT_OK(decoder.ApplyToOneColumn(dst_row->row_index(), &dst_col,
                                               memrowset_->schema_nonvirtual(),
                                               mapping.second, dst_arena));
      }

循环结束后,方法根据 is_deleted_startis_deleted_end 确定最终的 ApplyStatus

1
2
3
4
5
if (opts_.snap_to_exclude && is_deleted_start && is_deleted_end) {
    local_apply_status = APPLIED_AND_UNOBSERVABLE;
} else if (is_deleted_end && local_apply_status == APPLIED_AND_PRESENT) {
    local_apply_status = APPLIED_AND_DELETED;
}
  • APPLIED_AND_UNOBSERVABLE:行在窗口两端都是死的——它的整个生命周期是窗口的子集,外部观察者在任何一端都看不到它存活,应从结果中排除。
  • APPLIED_AND_DELETED:行在窗口终点已被删除,但窗口起点时是存活的。是否返回取决于 opts_.include_deleted_rows
  • NONE_APPLIED:没有窗口内的 mutation 被应用。若 insert_excluded 为 true(INSERT 太旧且无窗口内变更),该行也应排除。

回到 FetchRows,它根据 ApplyStatus 决定是否在 SelectionVector 中取消选中该行:

1
2
3
4
unset_in_sel_vector =
    (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
    (apply_status == NONE_APPLIED && insert_excluded) ||
    (apply_status == APPLIED_AND_UNOBSERVABLE);

如果查询请求了 IS_DELETED 虚拟列,则在选中行的该列位置写入 apply_status == APPLIED_AND_DELETED 的布尔值。

Flush/Compaction 路径:GetCurrentRow(五参数重载)。 与面向客户端的 FetchRows 路径不同,Flush/Compaction 使用五参数版本的 GetCurrentRow。它不做 MVCC roll-forward,而是将行的基线数据和 redo mutation 链表分别输出,由下游的 DiskRowSet 分别写入 base data 和 delta store。

如果 MRS Schema 与 projection Schema 不同(!delta_projector_.is_identity()),需要对 redo 链表中每个 mutation 的 changelist 做列投影——保留 projection 中存在的列变更,丢弃不存在的。投影后为空的 mutation(所有变更列都不在 projection 中)直接跳过。投影后的 mutation 节点从 mutation_arena 分配,组成一条全新的链表返回给调用者。


附录

相关源码文件索引

文件
src/kudu/tablet/memrowset.hMemRowSet、MRSRow、Iterator 类定义
src/kudu/tablet/memrowset.ccInsert、MutateRow、FetchRows、ApplyMutations 实现
src/kudu/tablet/concurrent_btree.hCBTree、LeafNode、InternalNode、PreparedMutation、CBTreeIterator
src/kudu/tablet/mutation.hMutation 类(变长对象、Arena 分配、链表操作)
src/kudu/tablet/mutation.ccAppendToListAtomic、StringifyMutationList
src/kudu/util/memory/arena.hArenaBase、ThreadSafeMemoryTrackingArena
src/kudu/util/memory/memory.hMemoryTrackingBufferAllocator、BufferAllocator
src/kudu/tablet/rowset.hRowSetKeyProbe、ProbeStats
src/kudu/common/schema.hSchema::EncodeComparableKey
src/kudu/tablet/memrowset-test.cc单元测试

针对 memrowset 尝试的优化