Block 抽象接口

操作系统把磁盘抽象成文件,Kudu 则在文件之上再加了一层抽象——Block。在 Kudu 中,一列数据、一个 BloomFilter、一份主键索引,最终都变成一个或多个 Block 写入磁盘。Block 是 Kudu 存储引擎与本地文件系统之间的分界线:上层组件只需面对 Block 接口的 Append / Read,不必关心底层是一个独立文件(FileBlockManager)还是日志容器中的一段字节区间(LogBlockManager)。

这种隔离方式与 Unix 的设备抽象思路一致——Unix 内核用 struct file 加上 read/write 函数指针屏蔽底层设备差异;Kudu 用 Block 基类加上虚函数屏蔽底层文件系统的组织差异。

BlockId:Block 的身份标识

每个 Block 都有一个全局唯一的身份标识 BlockIdsrc/kudu/fs/block_id.h),本质上是一个 64 位无符号整数:

1
2
3
4
class BlockId {
 private:
  uint64_t id_;
};

BlockId 可以序列化到 protobuf(CopyToPB / FromPB),也可以打印成 16 位十六进制字符串供调试。它是不透明的——上层代码不应假设 ID 的分配规则或数值含义,只需把它当作一把钥匙,用来在 BlockManager 中取回对应的数据。

Block 基类

Block 是所有 Block 的基类,接口极为简洁——只有一个方法:

1
2
3
4
5
class Block {
 public:
  virtual ~Block() = default;
  virtual const BlockId& id() const = 0;
};

id() 返回该 Block 的 BlockId。这足以让上层代码通过 ID 引用任何 Block,而不必关心它是可读的还是可写的。Block 的两个子类——WritableBlockReadableBlock——分别定义了写路径和读路径的完整接口。

WritableBlock:写入一个 Block

WritableBlock 对应一个正在写入的 Block。它的设计遵循三条规则:

  • 只能追加(append-only);
  • 单线程写入
  • 写入后不可变。

这些约束并非偶然——它们直接服务于 Kudu 的列式存储模型:一次 flush 或 compaction 产生的每列数据从头到尾顺序写入一个 Block,写完就封存,此后只读不改。

状态机

一个 WritableBlock 从创建到关闭,经历四个状态:

1
2
   Append()        Finalize()       Close()
CLEAN ------→ DIRTY --------→ FINALIZED ------→ CLOSED
  • CLEAN:刚创建,还没有写入任何数据。
  • DIRTY:已调用 Append(),内存中存在尚未落盘的脏数据。
  • FINALIZED:已调用 Finalize(),不再接受新写入,但数据还未保证持久化。
  • CLOSED:已调用 Close(),数据和元数据均已持久化到磁盘,Block 不可再操作。

这个状态机的关键在于 FINALIZED 这个中间态。如果没有它,写入路径只有两步:写数据、然后 Close() 刷盘。但 Close() 是一个昂贵的操作——它不仅要把脏数据 flush 到内核缓冲区,还要 fsync 确保数据落盘。Finalize() 的存在使得刷盘可以分两步完成:先异步 flush,再同步 fsync,中间留出时间窗口让 CPU 做其他工作。

核心方法

Append(data)AppendV(data_array) 将数据追加到 Block 末尾。AppendV 是向量化版本,一次传入多段数据,减少虚函数调用次数。两者都不保证持久性——数据可能只停留在用户态缓冲区或内核 page cache 中。

Finalize() 通知 Block 不再接受写入。当 gflag block_manager_preflush_control 设为 "finalize"(默认值)时,它还会启动一次异步 flush——把数据从用户态推到内核 page cache,但不等待磁盘确认。这与操作系统的 readahead 思路相同:提前发起 I/O,用后续的计算时间来掩盖磁盘延迟。

Close() 是最终确认。它等待所有未完成的 I/O,然后 fsync 数据和元数据,保证 Block 完整落盘。成功返回后,这个 Block 就是持久的——即使进程崩溃、机器断电,数据也不会丢失。

Abort()Close() 的反面——它放弃这个 Block,丢弃已写入的数据。如果一个 WritableBlock 被析构时还没有调用 Close(),析构函数会自动调用 Abort()。这是一种 RAII 保证:没有显式提交的写入会被安静地丢弃,而非留下半成品数据。

真实使用模式

CFileWriter 在写完一个列文件后的调用序列展示了典型的写入模式(src/kudu/cfile/cfile_writer.cc):

1
2
3
4
5
// 1. 先前已通过多次 Append() 写入列数据和索引...
// 2. 写完后 Finalize,启动异步刷盘
RETURN_NOT_OK(block_->Finalize());
// 3. 将 Block 移入创建事务,后续由事务统一 Close
transaction->AddCreatedBlock(std::move(block_));

单个 Block 也可以不走事务,直接走完生命周期(src/kudu/fs/block_manager-test.cc):

1
2
3
4
5
6
unique_ptr<WritableBlock> block;
bm->CreateBlock(opts, &block);     // 创建
block->Append(data);               // 写入
block->Finalize();                 // 异步刷盘
block->Close();                    // fsync 落盘
BlockId id = block->id();          // 记住 ID,后续用它来读

Pre-flush 控制

block_manager.cc 中定义了一个 gflag block_manager_preflush_control,控制异步刷盘的触发时机:

取值行为适用场景
"finalize"Finalize() 时启动异步 flush默认值,多盘吞吐优先
"close"事务 CommitCreatedBlocks() 时启动异步 flush中间策略
"never"从不异步 flush,Close() 时同步完成所有 I/O少盘/低延迟优先

默认选择 "finalize" 的逻辑很明确:Kudu 集群通常有多块数据盘,每块盘上同时有不同列的 Block 在写入。如果每列 Block 在 Finalize() 时就开始异步 flush,那么多块盘的 I/O 可以并行推进。等到最后统一 Close() 时,大部分数据已经落盘,fsync 只需等待少量残留 I/O。这正是 Unix 中 write() + fsync() 分离的经典优化——先让多个写入各自推进,再统一同步。

ReadableBlock:读取一个 Block

ReadableBlock 代表一个可读取的 Block。由于 Block 在写入后不可变(immutable),读操作天然线程安全——多个线程可以并发读取同一个 ReadableBlock 对象,无需加锁。同一个逻辑 Block 也可以被多次 OpenBlock() 打开,产生多个独立的 ReadableBlock 实例。

核心方法

Size(sz) 返回 Block 在磁盘上的大小(字节数)。

Read(offset, result) 从 Block 的 offset 位置开始,精确读取 result.size 个字节。调用者需预先分配好 result 指向的缓冲区。如果请求的字节超过 Block 大小,返回错误。

ReadV(offset, results) 是向量化版本(scatter read):从 offset 开始,依次填充 results 中每个 Slice 所指向的缓冲区。这避免了多次 Read() 调用的虚函数开销,且底层实现可以合并为一次系统调用。CFileReader 在解析文件头时就使用了这种模式——将 header 和 checksum 放进一个 Slice 数组,一次 ReadV 读完:

1
2
3
4
5
vector<Slice> results = { header };
if (do_verify_checksum()) {
  results.push_back(checksum);
}
block_->ReadV(kMagicAndLengthSize, results);  // 一次读取 header + checksum

Close() 释放内存中的 Block 对象。注意这里与 WritableBlock::Close() 的含义截然不同——读 Block 的 Close() 不涉及任何磁盘 I/O,只是释放内存资源(如释放文件描述符、从缓存中移除引用等)。磁盘上的数据不受影响。

memory_footprint() 返回此对象的内存占用(含对象本身),供 MemTracker 统计。

BlockManager:Block 的管辖者

BlockManager 统一管理所有 Block 的创建、打开、删除和生命周期。它相当于一个专门为 Block 设计的"文件系统"——上层组件通过它来分配新 Block、按 ID 检索已有 Block、批量删除过期 Block。所有方法都是线程安全的。

1
2
3
4
5
class BlockManager : public RefCountedThreadSafe<BlockManager> {
 public:
  static const std::set<std::string>& block_manager_types();
  // ...
};

block_manager_types() 返回当前平台支持的 BlockManager 实现类型。在 Linux 上有三种:"file"(每个 Block 一个独立文件)、"log"(多个 Block 合并到日志式容器文件)和 "logr"(基于 RocksDB 的元数据管理的日志式容器)。非 Linux 平台只支持 "file"

核心操作:

  • CreateBlock(opts, block):创建一个新 Block 并打开写入。Block ID 由 BlockManager 内部自动分配。CreateBlockOptions 携带 tablet_id,用于将 Block 放到正确的磁盘目录组中。
  • OpenBlock(block_id, block):按 ID 打开一个已有 Block 进行读取。
  • FindBlockPath(block_id, path):查找 Block 数据所在的物理文件路径。
  • GetAllBlockIds(block_ids):列出所有受管理的 Block ID(无序,非确定性),主要用于一致性检查工具 ksck。
  • NotifyBlockId(block_id):告知 BlockManager 某个 Block ID 的存在,防止顺序 ID 分配器把已被外部引用的 ID 重新分配给新 Block(例如数据目录故障后恢复时)。

BlockManager 继承了 RefCountedThreadSafe,生命周期通过引用计数管理。这使得多个组件(如多个 tablet)可以安全地持有同一个 BlockManager 的引用。

批量事务:BlockCreationTransaction 与 BlockDeletionTransaction

单独操作一个 Block 虽然可行,但在 flush 和 compaction 这类一次产生/删除大量 Block 的场景中效率不高——每个 Block 单独 fsync 意味着大量独立的磁盘屏障操作。Kudu 用两种事务接口来解决这个问题。

BlockCreationTransaction 将多个 WritableBlock 的创建编组为一个批次。使用方式很直接:逐个创建 Block 并写入数据,然后调用 AddCreatedBlock() 将它们注册到事务中,最后一次 CommitCreatedBlocks() 统一关闭和刷盘。好处有二:一是多个 Block 的 fsync 可以合并(group commit),减少磁盘屏障次数;二是等待未完成 I/O 时可以并行等待,而非逐个串行。

一次典型的 flush 操作展示了这个模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 创建事务
unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();

// 对每一列:创建 Block → 写入列数据 → Finalize → 注册到事务
for (each column) {
  unique_ptr<WritableBlock> block;
  bm->CreateBlock(opts, &block);
  cfile_writer->WriteColumnData(block.get());  // 内部多次 Append
  block->Finalize();                           // 异步刷盘
  transaction->AddCreatedBlock(std::move(block));
}

// 一次性提交所有 Block:触发 group fsync
transaction->CommitCreatedBlocks();

Finalize()CommitCreatedBlocks() 之间,每列 Block 的异步 flush 在各自的磁盘上并行推进。到 Commit 时,大量 I/O 已经完成,最终的 fsync 只需等待少量残余。这就是 Finalize() + 批量事务组合带来的吞吐优化。

BlockDeletionTransaction 做类似的事情,但方向相反:将多个 Block 的删除操作编组。调用 AddDeletedBlock(block_id) 添加待删除的 Block ID,然后 CommitDeletedBlocks(deleted) 提交。实际删除是延迟执行的——每个 Block 会在最后一个读/写者关闭后才真正释放磁盘空间,类似 Unix 中 unlink() 后文件直到最后一个 fd 关闭才释放 inode 的行为。deleted 参数返回实际成功删除的 ID 列表。

删除事务返回 shared_ptr(而非创建事务的 unique_ptr),因为同一批待删除的 Block 可能被多个组件引用——例如 compaction 线程决定要删除哪些旧 Block,而 tablet 的元数据更新也需要知道哪些 Block 已经删除。

两种事务都不是线程安全的,调用方需自行保证单线程操作或加锁。

配置选项

CreateBlockOptionsBlockManagerOptions 两个结构体控制 Block 和 BlockManager 的行为:

1
2
3
4
5
6
7
8
9
struct CreateBlockOptions {
  const std::string tablet_id;  // 用于 DataDirGroup 路由,决定 Block 放到哪组磁盘
};

struct BlockManagerOptions {
  scoped_refptr<MetricEntity> metric_entity;       // 指标实体,为 NULL 则不上报指标
  std::shared_ptr<MemTracker> parent_mem_tracker;  // 内存追踪器父节点
  bool read_only = false;                          // 只读模式(打开已有数据但不创建新 Block)
};

CreateBlockOptions 目前只有 tablet_id 一个字段,用于将 Block 放到正确的 DataDirGroup 中(每个 tablet 的数据分散在专属的一组磁盘目录上)。注释中提到未来可能扩展为按 Block 类型选择目录——例如将 BloomFilter Block 优先放到 SSD 上。

具体实现

block_manager.h 中定义的全是纯虚接口。真正的实现在三个 BlockManager 子类中:

类型实现类WritableBlockReadableBlock特点
"file"FileBlockManagerFileWritableBlockFileReadableBlock每个 Block 一个独立文件,简单但文件数可能极多
"log"LogBlockManagerLogWritableBlockLogReadableBlock多个 Block 共享一个容器文件,减少文件数和 inode 消耗
"logr"LogBlockManagerRdb用 RocksDB 管理元数据的日志式容器(Linux 专属)

Block 接口层的设计使得上层代码(CFileWriter、DiskRowSetWriter 等)完全不感知底层使用的是哪种 BlockManager。这种解耦让 Kudu 可以在不同部署场景下灵活切换存储策略——开发测试用简单的 FileBlockManager,生产环境用高效的 LogBlockManager。

小结

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
                      Block(纯虚基类)
                       ├── id()
            ┌──────────┴──────────┐
            │                     │
     WritableBlock          ReadableBlock
     ├── Append(data)       ├── Read(offset, result)
     ├── AppendV(data[])    ├── ReadV(offset, results[])
     ├── Finalize()         ├── Size(sz)
     ├── Close()  [fsync]   ├── Close()  [释放内存]
     ├── Abort()            └── memory_footprint()
     └── BytesAppended()

     BlockManager(管辖者)
     ├── CreateBlock() → WritableBlock
     ├── OpenBlock()   → ReadableBlock
     ├── NewCreationTransaction() → BlockCreationTransaction
     └── NewDeletionTransaction() → BlockDeletionTransaction

整个 Block 接口层的设计遵循几条清晰的原则:追加写入消除了随机写的复杂性;写入后不可变使得读操作无需加锁;Finalize + 批量事务把磁盘同步的开销从单点分摊到流水线中;虚接口隔离让存储策略可以独立演进。这些原则贯穿 Kudu 存储引擎的每一层——从 Block 到 CFile 到 DiskRowSet,思路一脉相承。


LogBlockManager 容器模型

FileBlockManager 为每个 Block 创建一个独立文件。这种做法概念简单,但文件数量与 Block 数量线性增长——一个生产集群上动辄产生数百万 Block,导致海量小文件淹没文件系统的 inode 表和目录索引。LogBlockManager 的核心思想是把多个 Block 聚合到少量大文件中,这些大文件在 Kudu 中称为 container

Container 的物理结构

每个 container 在磁盘上由两个文件组成:

1
2
<dir>/<uuid>.data        — 所有 Block 的实际数据,顺序追加写入
<dir>/<uuid>.metadata    — 元数据日志,记录每个 Block 的创建和删除

.data 文件是一个追加写的大文件,多个 Block 的数据紧挨着按写入顺序排列。每个 Block 在文件中占据一段连续的字节区间,起始偏移按文件系统块大小对齐:

1
2
3
4
5
6
7
8
9
block-beta
  columns 5
  A["Block_A\noff=0\nlen=4096"]:1
  B["Block_B\noff=4096\nlen=8192"]:1
  C["Block_C\noff=12288\nlen=8192"]:1
  H["(hole)\n已 punch\nBlock_D 删除"]:1
  E["Block_E\noff=20480\nlen=4096"]:1

  style H fill:#555,stroke:#333,color:#999,stroke-dasharray: 5 5

.metadata 文件是一个日志结构的 protobuf 容器文件,每条记录是一个 BlockRecordPB

1
2
3
4
5
6
7
message BlockRecordPB {
  required BlockIdPB block_id = 1;       // Block 的唯一 ID
  required BlockRecordType op_type = 2;  // CREATE 或 DELETE
  required uint64 timestamp_us = 3;      // 操作时间戳
  optional int64 offset = 4;             // 在 .data 文件中的偏移(CREATE 必填)
  optional int64 length = 5;             // 数据长度(CREATE 必填)
}

一份典型的 .metadata 文件内容像这样:

1
2
3
4
5
6
.metadata 日志:
  [CREATE] block_id=1, offset=0,     length=4096
  [CREATE] block_id=2, offset=4096,  length=8192
  [CREATE] block_id=3, offset=12288, length=8192
  [DELETE] block_id=3                               ← 逻辑删除
  [CREATE] block_id=5, offset=20480, length=4096

启动时,LogBlockManager 逐条回放 .metadata 中的记录来重建内存索引。新建一条 CREATE 记录就是"文件存在",后面跟一条 DELETE 就是"文件已删除"。回放结束后,只有未被 DELETE 的 Block 留在内存 map 中。

对齐放置是一个重要的设计约束。UpdateNextBlockOffset 将每个 Block 的结束位置向上对齐到文件系统块大小:

1
2
3
int64_t new_next_block_offset = KUDU_ALIGN_UP(
    block_offset + block_length,
    instance()->filesystem_block_size_bytes());

这保证了后续 hole punch 能以文件系统块为粒度真正回收磁盘空间——如果 Block 跨越了文件系统块的边界但没有完整覆盖最后一个块,punch 只能零填充而无法释放物理空间。按块对齐消除了这种问题。

核心对象

LogBlockManager 的内部由五个类协作完成所有工作:

角色生命周期
LogBlockManager全局管理者,持有 block map 和 container 池进程级
LogBlockContainer物理容器,管理 .data 文件的读写与空间回收引用计数
LogBlockContainerNativeMetaContainer 子类,通过 .metadata PB 文件管理元数据引用计数
LogBlock已持久化 Block 的内存索引条目(container + offset + length)引用计数
LogWritableBlock正在写入的 Block,Close 后才创建 LogBlock随 WritableBlock 所有权

它们之间的关系:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
classDiagram
  class LogBlockManager {
    managed_block_shards_ : BlockId → LogBlock (分片 hash map)
    all_containers_by_name_ : name → LogBlockContainer
    available_containers_by_data_dir_ : Dir → deque~Container~(空闲池)
  }

  class LogBlockContainer {
    <<基类>>
    data_file_ : shared_ptr~RWFile~ ← .data 文件句柄
    next_block_offset_ : atomic~int64_t~ ← 写游标
    live_blocks_, live_bytes_ ← 统计计数
    纯虚元数据接口()*
  }

  class LogBlockContainerNativeMeta {
    <<子类>>
    metadata_file_ : WritablePBContainerFile ← .metadata 文件
  }

  class LogBlock {
    container_ : LogBlockContainerRefPtr ← 住在哪个 container
    block_id_ : BlockId
    offset_ : int64_t ← 在 .data 中的起始偏移
    length_ : int64_t ← 数据长度
  }

  class LogWritableBlock {
    container_ : LogBlockContainerRefPtr
    block_id_ : BlockId
    block_offset_ : int64_t ← 创建时确定
    block_length_ : int64_t ← 随 Append 增长
    state_ : CLEAN → DIRTY → FINALIZED → CLOSED
  }

  LogBlockManager "1" --> "*" LogBlock : managed_block_shards_
  LogBlockManager "1" --> "*" LogBlockContainer : all_containers_by_name_
  LogBlockContainer <|-- LogBlockContainerNativeMeta : 继承
  LogBlock --> LogBlockContainer : container_
  LogWritableBlock --> LogBlockContainer : container_

在写入阶段,LogWritableBlock 拥有 container 引用并独占写权限,没有对应的 LogBlock 对象。只有 Close() 成功后,才会创建 LogBlock 并注册到全局 block map。这个规则保证了内存索引永远不会超前于磁盘状态。

CODE: 写入路径 —— 从 CreateBlock 到 Close

一个 Block 的完整写入生命周期经历四个阶段:分配 container 和 BlockId、追加写入数据、Finalize 释放 container、Close 持久化并注册。

分配 Container 和 BlockId。 LogBlockManager::CreateBlock 是写入的入口。它首先根据 CreateBlockOptions 中的 tablet_id 从对应的数据目录组中获取一个可用 container:

1
2
LogBlockContainerRefPtr container;
RETURN_NOT_OK(GetOrCreateContainer(opts, &container));

GetOrCreateContainer 先尝试从空闲池 available_containers_by_data_dir_ 的 front 取出一个 container。如果空闲池为空,则调用 CreateContainer 新建一个——生成 UUID、创建 .data.metadata 两个物理文件、初始化 PB 文件头。取出后 container 从空闲池移除,进入"独占使用"状态,其他写者无法同时向这个 container 追加数据。

然后分配一个全局唯一的 BlockId

1
2
3
4
BlockId new_block_id;
do {
  new_block_id.SetId(next_block_id_++);
} while (!TryUseBlockId(new_block_id));

next_block_id_ 是一个原子计数器,每次自增产生候选 ID。TryUseBlockId 在分片锁内检查该 ID 是否已被占用(blocks_by_block_id 中已存在或 open_block_ids 中已预占),如果可用则将 ID 插入 open_block_ids 表示"正在使用中"。循环通常只执行一次,除非遇到早期版本遗留的非连续 ID 碰撞。

最后构造 LogWritableBlock,其 block_offset_ 取自 container->next_block_offset()——当前 container 的写游标位置。此时既未写入任何数据,也未分配 LogBlock

追加写入数据。 LogWritableBlock::AppendV 负责实际的数据写入:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Status LogWritableBlock::AppendV(ArrayView<const Slice> data) {
  size_t data_size = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
                                [&](int sum, const Slice& curr) {
                                  return sum + curr.size();
                                });
  int64_t cur_block_offset = block_offset_ + block_length_;
  RETURN_NOT_OK(container_->EnsurePreallocated(cur_block_offset, data_size));
  RETURN_NOT_OK(container_->WriteVData(cur_block_offset, data));
  block_length_ += data_size;
  state_ = DIRTY;
  return Status::OK();
}

三步依次进行。第一步 EnsurePreallocated:如果本次写入会超出已预分配的空间范围,调用 fallocate 预分配一大块空间(默认 32 MB,参数可调),避免逐次小分配导致的文件碎片和系统调用开销。第二步 WriteVData:委托 container 的 RWFile::WriteV(底层 pwritev)将数据写到 .data 文件的指定偏移处。第三步更新 block_length_ 并切换到 DIRTY 状态(已有脏数据未持久化)。

预分配的思路一次性向文件系统申请一大块空间比频繁小量申请高效得多。preallocated_offset_ 记录了预分配的上界,只有当写入超过这个上界时才触发新一轮预分配。

Finalize:释放 Container。 当上层写完数据后调用 Finalize()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Status LogWritableBlock::Finalize() {
  if (state_ == FINALIZED) {
    return Status::OK();
  }
  SCOPED_CLEANUP({
    container_->FinalizeBlock(block_offset_, block_length_);
    state_ = FINALIZED;
  });
  if (state_ == DIRTY &&
      FLAGS_block_manager_preflush_control == "finalize") {
    RETURN_NOT_OK(FlushDataAsync());
  }
  return Status::OK();
}

SCOPED_CLEANUP 保证无论函数正常返回还是因 FlushDataAsync 失败而提前返回,都会调用 FinalizeBlock。这是一个重要的 RAII 保证——如果不归还 container,它会永远停留在"独占使用"状态,再也不会被分配给其他写者。

FinalizeBlock 做三件事:

  1. 调用 UpdateNextBlockOffset 将 container 写游标向前推进到当前 Block 结束位置(按文件系统块对齐),如果超过 FLAGS_log_container_max_size(默认 10 GB)则标记 container 为 full;

  2. 如果 container 已满,截断 .data 文件到写游标位置,回收最后一次预分配但未使用的尾部空间;

  3. 调用 MakeContainerAvailable 将 container 放回空闲池的 front,供后续写者使用。

如果 gflag block_manager_preflush_control 配置为 "finalize"(默认值),则在释放 container 之前还会调用 FlushDataAsync,异步地把脏数据从用户态缓冲区推到内核 page cache(底层是 sync_file_rangeSYNC_FILE_RANGE_WRITE)。这不保证持久化,但能让后续 fsync 时大部分数据已经抵达磁盘,从而减少同步等待时间。

整个 Finalize 环节体现了一个关键的设计选择:container 的独占期尽可能短。一个 container 只在 CreateBlock ↔ Finalize 之间被独占,Finalize 后立刻归还。但数据要等到 Close 时才真正持久化。这意味着多个写者可以交替地向同一 container 追加不同的 Block,提高了 container 的空间利用率。

Close:持久化与注册。 最终的持久化由 LogBlockContainer::DoCloseBlocks 完成。它既可以由单个 Block 的 Close() 调用(此时 blocks 数组只有一个元素),也可以由 LogBlockCreationTransaction::CommitCreatedBlocks() 批量调用(同一 container 下多个 Block 共享一次 fsync)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Status LogBlockContainer::DoCloseBlocks(const vector<LogWritableBlock*>& blocks,
                                        SyncMode mode) {
  auto sync_blocks = [&]() {
    if (mode == SYNC) {
      RETURN_NOT_OK(SyncData());                          // 第 1 步
    }
    RETURN_NOT_OK(AddBlockIdsToMetadata(blocks));         // 第 2 步
    if (mode == SYNC) {
      RETURN_NOT_OK(SyncMetadata());                      // 第 3 步
    }
    RETURN_NOT_OK(block_manager()->SyncContainer(*this)); // 第 4 步
    for (LogWritableBlock* block : blocks) {
      block->DoClose();                                   // 第 5 步
    }
    return Status::OK();
  };

  Status s = sync_blocks();
  if (!s.ok()) {
    SetReadOnly(s);
  }
  return s;
}

五步流水线:

  1. SyncData()fdatasync .data 文件,确保所有 Block 的数据字节持久化。
  2. AddBlockIdsToMetadata(blocks):为每个 Block 追加一条 CREATE BlockRecordPB.metadata 文件。这一步必须在数据 fsync 之后——否则元数据可能先于数据落盘,崩溃后会指向不存在的数据。
  3. SyncMetadata()fdatasync .metadata 文件,确保 CREATE 记录持久化。
  4. SyncContainer()fsync 容器所在目录,确保新创建的文件条目可见(仅在 container 首次创建时才实际 fsync,通过 dirty_dirs_ 集合去重)。
  5. DoClose():对每个 Block,创建不可变的 LogBlock 对象,注册到全局 block map,更新 metrics。

操作顺序的精心安排使得崩溃在任何步骤后都是安全的:

  • 如果在第 1 步(数据 fsync)之前崩溃:数据可能未落盘,但元数据也没写——没有孤儿,启动后一切正常。
  • 如果在第 2 步(写元数据)之前崩溃:数据已落盘但没有对应的 CREATE 记录——产生孤儿数据,占据磁盘空间但不可见,下次 GC 会清理。
  • 如果在第 3 步(元数据 fsync)之前崩溃:CREATE 记录可能未落盘,效果与上一种情况相同。
  • 如果在第 5 步之前崩溃:block 持久化成功但未注册到内存——重启时回放 .metadata 会恢复。

如果任一步骤失败(例如磁盘 I/O 错误),SetReadOnly(s) 将整个 container 标记为只读,禁止后续任何写入——因为 .data.metadata 可能处于不完整状态,继续写入只会加剧不一致。

DoClose 内部的关键代码是创建 LogBlock 并注册:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void LogWritableBlock::DoClose() {
  if (state_ == CLOSED) return;
  if (state_ == CLEAN || state_ == DIRTY) {
    container_->FinalizeBlock(block_offset_, block_length_);
  }
  LogBlockRefPtr lb = container_->block_manager()->CreateAndAddLogBlock(
      container_, block_id_, block_offset_, block_length_);
  CHECK(lb);
  container_->BlockCreated(lb);
  state_ = CLOSED;
}

CreateAndAddLogBlock 构造一个 LogBlock(四元组:container、block_id、offset、length),将其插入分片 block map(通过 block_id 低 4 位选择 shard 以减少锁竞争),同时从 open_block_ids 中移除预占标记。BlockCreated 更新 container 的统计计数(total_bytes_live_bytes_live_blocks_ 等)。从此刻起,该 Block 对 OpenBlock() 可见,可以被读取。

CODE: 批量创建事务

单个 Block 调用 Close() 即触发一次完整的 data-fsync + metadata-write + metadata-fsync 流水线。在 RowSet flush 这类一次产生十几个 Block(每列一个、外加 BloomFilter 和 ad-hoc index)的场景中,逐个 fsync 效率低下。LogBlockCreationTransaction 解决了这个问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Status LogBlockCreationTransaction::CommitCreatedBlocks() {
  // 按 container 分组
  unordered_map<LogBlockContainer*, vector<LogWritableBlock*>> created_block_map;
  for (const auto& block : created_blocks_) {
    if (FLAGS_block_manager_preflush_control == "close") {
      RETURN_NOT_OK(block->FlushDataAsync());
    }
    created_block_map[block->container()].emplace_back(block.get());
  }
  // 逐组调用 DoCloseBlocks,每组只做一次 fsync
  for (const auto& entry : created_block_map) {
    RETURN_NOT_OK(entry.first->DoCloseBlocks(entry.second,
                                             LogBlockContainer::SyncMode::SYNC));
  }
  created_blocks_.clear();
  return Status::OK();
}

关键优化:同一 container 内的 N 个 Block 共享一次 SyncData() 和一次 SyncMetadata(),将 2N 次 fsync 缩减为常数次。因为 fdatasync 的开销主要在等待磁盘完成旋转写入——合并后只需等有一次。同时事务可以跨 container 并行:不同 container 位于不同磁盘,它们的 Finalize + FlushDataAsync 阶段可以交错进行,最终 Commit 时各自独立 fsync。

CODE: 读取路径

Block 的读取通过 LogBlockManager::OpenBlock 发起:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
Status LogBlockManager::OpenBlock(const BlockId& block_id,
                                  unique_ptr<ReadableBlock>* block) {
  LogBlockRefPtr lb;
  {
    auto index = block_id.id() & kBlockMapMask;
    std::lock_guard l(*managed_block_shards_[index].lock);
    lb = FindPtrOrNull(*managed_block_shards_[index].blocks_by_block_id, block_id);
  }
  if (!lb) {
    return Status::NotFound("Can't find block", block_id.ToString());
  }
  block->reset(new internal::LogReadableBlock(std::move(lb)));
  return Status::OK();
}

通过 block_id 低 4 位选择 shard,在分片锁内查找 LogBlock 引用。找到后构造 LogReadableBlock,后者持有 LogBlockRefPtr(引用计数 +1),保证在读取期间 LogBlock 不会被析构。

LogReadableBlock::ReadV 将逻辑偏移转换为物理偏移后委托 container 读取:

1
2
3
4
5
6
7
8
Status LogReadableBlock::ReadV(uint64_t offset, ArrayView<Slice> results) const {
  uint64_t read_offset = log_block_->offset() + offset;
  if (log_block_->length() < offset + read_length) {
    return Status::IOError("Out-of-bounds read", ...);
  }
  RETURN_NOT_OK(log_block_->container()->ReadVData(read_offset, results));
  return Status::OK();
}

read_offset = log_block_->offset() + offset——这就是 LogBlock 四元组中 offset_ 字段的作用。上层代码感知的是 Block 内部的逻辑偏移(0 到 length),LogReadableBlock 负责加上 container 内的物理偏移后转发给 RWFile::ReadV(底层 preadv)。

由于 Block 写入后不可变,多个线程可以并发读取同一个 LogReadableBlock 对象,无需加锁——preadv 不修改文件位置指针,且底层数据不会被改变。

CODE: 删除路径 —— 逻辑删除与物理回收

Block 删除分为三个阶段:从内存 map 移除、写 DELETE 元数据记录、hole punch 回收磁盘空间。前两个阶段在 CommitDeletedBlocks 中同步完成,第三个阶段延迟到删除事务析构时异步执行。

从内存 map 移除。 LogBlockDeletionTransaction::CommitDeletedBlocks 首先调用 RemoveLogBlocks 批量操作:

1
2
3
4
5
6
7
8
9
Status LogBlockDeletionTransaction::CommitDeletedBlocks(vector<BlockId>* deleted) {
  vector<LogBlockRefPtr> log_blocks;
  Status first_failure = lbm_->RemoveLogBlocks(deleted_blocks_, &log_blocks, deleted);
  for (const auto& lb : log_blocks) {
    lb->RegisterDeletion(transaction);
    AddBlock(lb);
  }
  return first_failure;
}

RemoveLogBlocks 内部逐个 block 从分片 map 中 erase,使该 block 对 OpenBlock() 不再可见。移除后按 container 分组,对每组调用 RemoveBlockIdsFromMetadata.metadata 文件追加 DELETE 记录,并调用 BlockDeleted 递减 container 的 live_blocks_live_bytes_ 计数。

一个重要的设计选择是DELETE 记录不做 fsync。注释中解释了原因:即使 fsync 了 DELETE 记录,崩溃恢复时仍然需要处理"孤儿 block"(有 CREATE 无 DELETE)的情况——所以不 fsync 并不引入新的不一致类别,但节省了一次磁盘同步开销。最坏情况下,崩溃后重启会重新看到已被逻辑删除的 block,它们会在下次 GC 中被清理。

Hole Punch 回收磁盘空间。 LogBlockDeletionTransaction 析构时执行物理空间回收:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
  for (auto& entry : deleted_interval_map_) {
    LogBlockContainer* container = entry.first.get();
    if (container->check_death_condition()) {
      if (container->TrySetDead()) {
        lbm_->RemoveDeadContainer(container->ToString());
      }
      continue;
    }
    CHECK_OK(CoalesceIntervals<int64_t>(&entry.second));
    for (const auto& interval : entry.second) {
      container->ExecClosure([self, interval]() {
        self->ContainerDeletionAsync(interval.first,
                                     interval.second - interval.first);
      });
    }
  }
}

析构函数对每个 container 做两层判断。如果 container 满足"死亡条件"(full + live_blocks == 0 + blocks_being_written == 0),通过 CAS 原子操作标记为 dead,从全局 map 移除——后续 container 析构时直接删除 .data.metadata 文件,比逐 block punch 高效得多。

如果 container 还活着,则通过 CoalesceIntervals 合并同一 container 内相邻或重叠的删除区间(例如 [0,4096) 和 [4096,12288) 合并为 [0,12288)),然后异步提交到数据目录的线程池执行 PunchHolePunchHole 底层调用 fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length)——在文件中"打洞",让文件系统释放对应的物理磁盘块,但保持文件的逻辑大小不变。这与 Unix 稀疏文件的机制一致:一个 10 GB 的 .data 文件在 punch 掉大量已删除 block 后,实际占用的磁盘空间可能远小于 10 GB。

Container 的并发模型

LogBlockManager 对 container 的并发访问采用"取走/归还"(checkout/checkin)策略:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
sequenceDiagram
  participant Pool as Container 空闲池
  participant C as Container
  participant A as Writer_A
  participant B as Writer_B

  A->>Pool: 取 container
  Pool-->>A: container ref
  A->>C: Append Block_A
  A->>C: Finalize(归还到空闲池)
  C-->>Pool: 回到空闲池

  B->>Pool: 取 container(同一个)
  Pool-->>B: container ref
  B->>C: Append Block_B
  B->>C: Finalize(归还到空闲池)
  C-->>Pool: 回到空闲池

  Note over A,C: A 此时才 Commit: data fsync + metadata write
  A->>C: Commit (fsync)
  Note over B,C: B 随后 Commit
  B->>C: Commit (fsync)

写入阶段(CreateBlock → Finalize)是独占的:container 从空闲池取出后不再可用,其他写者必须等待或新建 container。但 Finalize 一完成,container 就立刻回到空闲池,下一个写者可以获取。而先前的写者还没有 Close——数据尚未 fsync。这意味着同一 container 的 .data 文件中可能同时有多个尚未持久化的 Block。

这种设计带来的风险是:如果某个写者的 SyncData 失败,container 会被标记为只读(SetReadOnly),其他已经写入但未 Close 的 Block 也会受到影响——它们的 DoCloseBlocks 将因为 read_only_status() 检查而失败。头文件注释中坦率地指出了这个权衡:

having concurrent writers grants better utilization for each container; however a failure to sync by any of the writers will cause the others to fail and potentially corrupt the underlying container.

Container 的生命周期与死亡判定

Container 通过 RefCountedThreadSafe 管理引用计数。正常生命周期是:新建 → 反复使用 → 被标记为 full → 其中所有 block 逐渐被删除 → 满足死亡条件 → 析构时删除磁盘文件。

死亡判定在 check_death_condition 中:

1
2
3
4
bool check_death_condition() const {
  return (full() && live_blocks() == 0 && blocks_being_written() == 0 &&
          FLAGS_log_block_manager_delete_dead_container);
}

四个条件必须同时满足。其中 blocks_being_written() == 0 这个条件看似多余——如果 full() 为 true,不应该还有块在写入。但实际上 Finalize 时才标记 full,而 DoClose 中的 BlockCreated 递增 live_blocks_ 发生在 Finalize 之后。如果一个写者刚 Finalize(此时 container 被标记 full 且 live_blocks 尚未递增),另一个线程触发了删除事务并把 container 中最后一个旧 block 删除,此时 live_blocks() == 0。如果没有 blocks_being_written 保护,这个 container 就会被误判为 dead——但实际上有一个新 block 即将被 Close。blocks_being_written_LogWritableBlock 构造时递增、析构时递减,精确地覆盖了这个窗口期。

NativeMeta 子类扩展了 full() 的判定——除了 .data 文件写满之外,.metadata 文件超过 FLAGS_log_container_metadata_max_size 也会触发 full:

1
2
3
4
5
6
7
bool LogBlockContainerNativeMeta::full() const override {
  if (LogBlockContainer::full()) return true;
  if (FLAGS_log_container_metadata_max_size <= 0) return false;
  shared_lock<RWMutex> l(metadata_compact_lock_, std::try_to_lock);
  if (!l.owns_lock()) return false;
  return metadata_file_->Offset() >= FLAGS_log_container_metadata_max_size;
}

CODE: 启动恢复 —— Open、LoadContainer 与 Repair

启动恢复是 LogBlockManager 中最复杂的流程,涉及并行 I/O、元数据回放、一致性检查和多种修复操作。整个流程分为三大阶段:并行打开 container加载并回放元数据记录修复不一致

阶段 1:并行打开数据目录。 LogBlockManager::Open 首先为每个数据目录设置 block 数量上限(处理 KUDU-1508 bug),然后为每个健康的数据目录异步提交 OpenDataDir 任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
for (const auto& dd : dd_manager_->dirs()) {
  // 跳过已失败的目录
  if (dd_manager_->IsDirFailed(uuid_idx)) {
    statuses[i] = Status::IOError("Data directory failed", "", EIO);
    continue;
  }
  dd->ExecClosure([this, dd_raw, results, s, containers_processed, containers_total]() {
    this->OpenDataDir(dd_raw, results, s, containers_processed, containers_total);
  });
}
dd_manager_->WaitOnClosures();

OpenDataDir 列出目录下的所有文件,按后缀(.data / .metadata)提取 container 名称(去重),然后逐个调用 OpenContainer 打开物理文件并构造 LogBlockContainer 对象。每个 container 打开后,又异步提交 LoadContainer 任务到该目录的线程池——这意味着同一数据目录内的多个 container 也是并行加载的:

1
2
3
4
5
6
7
8
9
for (const string& container_name : containers_seen) {
  LogBlockContainerRefPtr container;
  s = OpenContainer(dir, &results->back()->report, container_name, &container);
  // ...
  auto* r = results->back().get();
  dir->ExecClosure([this, dir, container, r]() {
    this->LoadContainer(dir, container, r);
  });
}

并行度取决于每个 Dir 的线程池大小(默认 FLAGS_fs_max_thread_count_per_data_dir = 8)。对于 12 块磁盘的机器,最多可以有 12 × 8 = 96 个 container 同时在加载。

阶段 2:回放元数据记录。 LoadContainer 是单个 container 的核心加载逻辑。它调用 container->ProcessRecords() 逐条读取 .metadata 文件中的 BlockRecordPB,构建 container 本地的数据结构:

1
2
3
4
5
6
7
8
UntrackedBlockMap live_blocks;       // 存活 block 的临时 map
BlockRecordMap live_block_records;   // 存活 block 的原始记录(供后续 compact 使用)
vector<LogBlockRefPtr> dead_blocks;  // 已删除的 block
uint64_t max_block_id = 0;
Status s = container->ProcessRecords(&result->report,
                                     &live_blocks, &live_block_records,
                                     &dead_blocks, &max_block_id,
                                     ProcessRecordType::kReadAndUpdate);

一个重要的设计选择是:回放结果先放在 container 本地 map,不直接插入全局 block map。注释解释了原因——两个 container 可能出现这种情况:container A 有 CREATE <b>,container B 有 CREATE <b>; DELETE <b>。如果逐条插入全局 map,先处理 A 后处理 B 时会误报重复 ID。先在本地 map 中扣除已删除 block,再批量合入全局 map,就能正确处理这种交叉。

ProcessRecords 内部的 NativeMeta 版本打开 .metadata 文件作为 ReadablePBContainerFile,循环读取每条 BlockRecordPB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
Status LogBlockContainerNativeMeta::ProcessRecords(...) {
  ReadablePBContainerFile pb_reader(std::move(metadata_reader));
  RETURN_NOT_OK_HANDLE_ERROR(pb_reader.Open());
  while (true) {
    BlockRecordPB record;
    read_status = pb_reader.ReadNextPB(&record);
    if (!read_status.ok()) break;
    RETURN_NOT_OK(ProcessRecord(&record, report, live_blocks,
                                live_block_records, dead_blocks,
                                &data_file_size, max_block_id, type));
  }
  if (PREDICT_TRUE(read_status.IsEndOfFile())) {
    return Status::OK();  // 正常读完
  }
  if (read_status.IsIncomplete()) {
    // 发现截断的部分记录——记入 report,Repair 阶段会截断文件
    report->partial_record_check->entries.emplace_back(ToString(), pb_reader.offset());
    return Status::OK();
  }
  return read_status;  // 不可恢复的错误
}

对于每条 CREATE 记录,ProcessRecord 先验证 offset/length 的合法性(非负、不超过 .data 文件实际大小),然后构造 LogBlock 插入本地 live_blocks map。同时调用 UpdateNextBlockOffset 推进 container 写游标、BlockCreated 更新统计计数——即使该 block 后续被 DELETE,其占据的字节区间也不会被复用(这保证了 container 内部不会出现空间重叠):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
case CREATE: {
  if (PREDICT_FALSE(!record->has_offset() || !record->has_length() ||
                    record->offset() < 0 || record->length() < 0)) {
    report->malformed_record_check->entries.emplace_back(ToString(), record);
    break;
  }
  const BlockId block_id(BlockId::FromPB(record->block_id()));
  LogBlockRefPtr lb = new LogBlock(this, block_id, record->offset(), record->length());
  if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
    report->malformed_record_check->entries.emplace_back(ToString(), record);
    break;  // 重复 ID,视为畸形记录
  }
  if (type == ProcessRecordType::kReadAndUpdate) {
    UpdateNextBlockOffset(lb->offset(), lb->length());
    BlockCreated(lb);
  }
  (*live_block_records)[block_id].Swap(record);
  *max_block_id = std::max(*max_block_id, block_id.id());
  break;
}

对于 DELETE 记录,ProcessDeleteRecordlive_blocks 中擦除对应 block,调用 BlockDeleted 递减统计计数,并将 LogBlock 引用放入 dead_blocks 列表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void LogBlockContainerNativeMeta::ProcessDeleteRecord(...) {
  const BlockId block_id(BlockId::FromPB(record->block_id()));
  LogBlockRefPtr lb = EraseKeyReturnValuePtr(live_blocks, block_id);
  if (!lb) {
    report->malformed_record_check->entries.emplace_back(ToString(), record);
    return;  // 找不到对应的 CREATE,视为畸形
  }
  if (type == ProcessRecordType::kReadAndUpdate) {
    BlockDeleted(lb);
  }
  CHECK_EQ(1, live_block_records->erase(block_id));
  dead_blocks->emplace_back(std::move(lb));
}

回放结束后,LoadContainer 进行多项检查和分类:

  1. 对齐检查:遍历 live_blocks,标记偏移未按文件系统块大小对齐的 block(KUDU-1793 的遗留问题),记入 misaligned_block_check 报告。

  2. 死亡 container 判定:如果 container 已满(full())且 live_blocks 为 0,整个 container 加入 dead_containers 列表,后续直接删除磁盘文件。

  3. 低存活率 container 判定:如果 container 已满且 live_blocks / total_blocks ≤ FLAGS_log_container_live_metadata_before_compact_ratio(默认 0.5),将其存活记录收集到 low_live_block_containers map,后续进行 metadata compact。

  4. 多余空间检测:对于已满 container,比较 .data 文件实际磁盘占用与理论的 live_bytes_aligned——如果实际占用超过理论值的 (1 + FLAGS_log_container_excess_space_before_cleanup_fraction) 倍,说明有未回收的已删除 block 或预分配尾部空间,将 dead_blocks 加入 need_repunching_blocks 列表:

1
2
3
4
5
6
7
8
9
int64_t cleanup_threshold_size = container->live_bytes_aligned() *
    (1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
if (reported_size > cleanup_threshold_size) {
  report->full_container_space_check->entries.emplace_back(...);
  if (container->live_blocks()) {
    result->need_repunching_blocks.insert(result->need_repunching_blocks.end(),
                                          dead_blocks.begin(), dead_blocks.end());
  }
}
  1. 合入全局 block map:最后,本地 live_blocks 中的每个 LogBlock 通过 AddLogBlock 插入全局分片 map。同时更新全局 next_block_id_(取所有 container 中出现的最大 block_id + 1),确保后续新分配的 ID 不会与已有 ID 冲突。container 自身注册到 all_containers_by_name_ 并放入空闲池。

阶段 3:修复。 所有 container 加载完成后,Open 为每个数据目录提交异步 RepairTaskRepair 在只读模式下跳过,否则按以下顺序执行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status LogBlockManager::Repair(Dir* dir, FsReport* report,
    vector<LogBlockRefPtr> need_repunching,
    vector<LogBlockContainerRefPtr> dead_containers,
    const ContainerBlocksByName& low_live_block_containers) {
  if (opts_.read_only) { return Status::OK(); }
  if (report->HasFatalErrors()) { return Status::OK(); }

  // 1. 移除死亡 container
  for (const auto& d : dead_containers) {
    CHECK(d->TrySetDead());
    RemoveDeadContainerUnlocked(d->ToString());
  }
  // 2. 收集需要修复的 container 引用
  FindContainersToRepair(report, low_live_block_containers, &containers_by_name);
  // 3. 释放死亡 container 引用 → 触发析构 → 删除 .data 和 .metadata 文件
  dead_containers.clear();
  // 4. 执行具体修复
  RETURN_NOT_OK(DoRepair(dir, report, low_live_block_containers, containers_by_name));
  // 5. 补做 hole punch
  auto transaction = std::make_shared<LogBlockDeletionTransaction>(this);
  for (const auto& b : need_repunching) {
    b->RegisterDeletion(transaction);
    transaction->AddBlock(b);
  }
  need_repunching.clear();  // 释放引用 → 触发 transaction 析构 → 执行 punch
  return Status::OK();
}

第 1 步通过 TrySetDead() CAS 标记 container 为 dead,从全局 map 移除。第 3 步释放最后的引用触发 ~LogBlockContainerNativeMeta(),在析构函数中删除 .data.metadata 物理文件。

第 4 步 DoRepair 执行三类修复:

  • 截断部分记录:如果 ProcessRecords 发现 .metadata 文件末尾有截断的不完整 PB 记录(崩溃在写入 metadata 过程中),重新打开文件并 Truncate 到最后一条完整记录的偏移,然后 ReopenMetadataWriter 让后续写入从正确位置继续。
  • 删除不完整 container:只有 .metadata 而无 .data(或反之)的孤儿文件直接删除。
  • Metadata compact:对 low_live_block_containers 中的每个 container,并行调用 RewriteMetadataFile 重写 .metadata 文件。

第 5 步复用了运行时的删除事务机制——将 dead_blocks 注册到 LogBlockDeletionTransaction,当 need_repunching 向量清空、transaction 析构时,自动触发 CoalesceIntervals + PunchHole 的异步执行。

Metadata Compact 的详细流程。 RewriteMetadataFile 是 compact 的核心,它采用"写临时文件 + 原子 rename"模式保证崩溃安全:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Status LogBlockManager::RewriteMetadataFile(const LogBlockContainer& container,
                                            const vector<BlockRecordPB>& records,
                                            int64_t* file_bytes_delta) {
  const string metadata_file_name = StrCat(container.ToString(), kContainerMetadataFileSuffix);
  uint64_t old_metadata_size;
  RETURN_NOT_OK(env_->GetFileSize(metadata_file_name, &old_metadata_size));

  // 创建临时文件
  string tmpl = metadata_file_name + kTmpInfix + ".XXXXXX";
  unique_ptr<RWFile> tmp_file;
  string tmp_file_name;
  RETURN_NOT_OK(env_->NewTempRWFile(opts, tmpl, &tmp_file_name, &tmp_file));
  auto tmp_deleter = MakeScopedCleanup([&]() {
    WARN_NOT_OK(env_->DeleteFile(tmp_file_name), ...);
  });

  // 写入只包含存活 block 的 CREATE 记录
  WritablePBContainerFile pb_file(std::move(tmp_file));
  RETURN_NOT_OK(pb_file.CreateNew(BlockRecordPB()));
  for (const auto& r : records) {
    RETURN_NOT_OK(pb_file.Append(r));
  }
  RETURN_NOT_OK(pb_file.Sync());
  RETURN_NOT_OK(pb_file.Close());

  // 原子 rename 覆盖旧文件
  RETURN_NOT_OK(env_->RenameFile(tmp_file_name, metadata_file_name));
  if (PREDICT_TRUE(file_cache_)) {
    file_cache_->Invalidate(metadata_file_name);  // 刷新 FileCache
  }

  tmp_deleter.cancel();
  *file_bytes_delta = old_metadata_size - new_metadata_size;
  return Status::OK();
}

写入前,存活记录已由 SortRecords 按 timestamp 排序(timestamp 相同则按 offset 排序),这使得 compacted 后的文件保持与原始写入顺序一致的排列。

崩溃安全分析:rename 是 POSIX 语义下的原子操作——要么看到旧的完整 .metadata 文件,要么看到新的 compacted 文件,不会出现半截状态。如果在 rename 之前崩溃,临时文件(带 .tmp.XXXXXX 后缀)由 FsManager 启动时的 CleanTmpFiles 清理。如果在 rename 之后、ReopenMetadataWriter 之前崩溃,下次启动时会正常打开 compacted 后的文件。

所有 compact 完成后,统一对数据目录执行一次 SyncDir,确保 rename 的目录条目持久化——如果不做这个 sync,崩溃后可能回到 rename 之前的状态,compacted 文件丢失。

整体流程图:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
flowchart TB
  subgraph Open
    A1["为每个数据目录提交 OpenDataDir"] --> A2["WaitOnClosures()"]
  end
  subgraph OpenDataDir["OpenDataDir (per dir, 并行)"]
    B1["列出 .data/.metadata 文件"] --> B2["去重得到 container 列表"]
    B2 --> B3["逐个 OpenContainer"]
    B3 --> B4["提交 LoadContainer 到线程池"]
  end
  subgraph LoadContainer["LoadContainer (per container, 并行)"]
    C1["ProcessRecords 回放 .metadata"] --> C2["CREATE → 插入本地 live_blocks"]
    C1 --> C3["DELETE → 从 live_blocks 擦除, 放入 dead_blocks"]
    C2 --> C4["对齐检查 + 死亡/低存活率判定"]
    C3 --> C4
    C4 --> C5["合入全局 block map"]
  end
  subgraph Repair["RepairTask (per dir, 并行)"]
    D1["删除死亡 container 文件"] --> D2["截断部分 metadata 记录"]
    D2 --> D3["删除不完整 container 文件"]
    D3 --> D4["Compact 低存活率 container 的 .metadata"]
    D4 --> D5["补做 hole punch"]
  end

  A2 --> Repair

全内存元数据的代价与收益

LogBlockManager 将所有元数据加载到内存。一个 LogBlock 对象包含 container 引用(8 字节指针)、BlockId(8 字节)、offset(8 字节)、length(8 字节),外加引用计数和 shared_ptr 内部开销,实际每条约 64 字节。1000 万 Block 约占 610 MB 内存。

收益是明确的:读写路径完全不需要磁盘元数据 I/O。OpenBlock 只是一次 hash map 查找加引用计数递增,ReadV 只需一次 preadv——没有额外的元数据读盘。

代价同样明确:启动时必须回放所有 .metadata 文件来重建内存索引。对于 container 数量巨大、DELETE 记录堆积的集群,这可能导致启动时间显著增加。metadata compact 机制正是为了缓解这个问题:去除已 DELETE 的记录后,回放量大幅减少。

小结

写入路径:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
flowchart LR
  subgraph CreateBlock
    A1[取 container] --> A2[分配 BlockId]
  end
  subgraph Append x N
    B1[写 .data 文件] --> B2[预分配空间]
  end
  subgraph Finalize
    C1[更新游标] --> C2[归还 container] --> C3[异步 preflush]
  end
  subgraph Close
    D1[data fsync] --> D2[写 .metadata CREATE] --> D3[metadata fsync] --> D4[创建 LogBlock] --> D5[注册到 block map]
  end

  A2 -->|"block"| B1
  B2 -->|"..."| C1
  C3 -->|"..."| D1

删除路径:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
flowchart LR
  subgraph CommitDeletedBlocks
    E1[从 block map 移除] --> E2[写 .metadata DELETE 记录] --> E3[更新 live_blocks 计数]
  end
  subgraph dest["~LogBlockDeletionTransaction"]
    F1[合并相邻删除区间] --> F2{dead container?}
    F2 -->|Yes| F3[删除文件]
    F2 -->|No| F4[hole punch]
  end

  E3 -->|"log_blocks"| F1

读取路径:

1
2
3
4
5
6
7
8
9
flowchart LR
  subgraph OpenBlock
    G1[block map 查找] --> G2[引用计数 +1]
  end
  subgraph ReadV
    H1["逻辑偏移 + block.offset_ = 物理偏移"] --> H2["preadv(.data, 物理偏移, len)"]
  end

  G2 -->|"LogBlock"| H1

LogBlockManager 的设计围绕一个核心洞察:存储引擎产生的 Block 是追加写入、不可变、批量产生和批量删除的。container 模型利用这些特性,把海量小 Block 聚合为少量大文件(减少 inode 耗尽和目录查找开销)、用顺序追加写消除随机写(利于磁盘和 SSD 的写入模式)、用 hole punch 在不重写文件的前提下回收空间(避免 compaction 式的数据搬移)、用全内存元数据消除读写路径的元数据 I/O(以启动时间换运行时性能)。

FsManager 与多目录管理

BlockManager 管理 Block 的读写,但它不决定 Block 存放在哪块磁盘上——这个职责属于 FsManager 和它下辖的 DataDirManager。FsManager 是 Kudu 存储引擎的"文件系统总管",统一抽象了 WAL 目录、元数据目录和数据目录的布局,并为上层提供"按 tablet_id 创建 Block"、“打开 Block"等接口,隐藏了多磁盘管理的全部复杂性。

目录布局

Kudu 的磁盘布局由三类根目录(root)驱动,分别通过 gflag 配置:

gflag用途典型值
--fs_wal_dirWrite-Ahead Log 根目录/data0/kudu/wal
--fs_data_dirsBlock 数据根目录(逗号分隔,可多盘)/data0/kudu/data,/data1/kudu/data,/data2/kudu/data
--fs_metadata_dirTablet 元数据根目录默认使用 WAL 目录或首个数据目录

每个根目录下的真实结构(以数据目录为例):

1
2
3
4
5
6
/data0/kudu/data/
├── instance                         ← InstanceMetadataPB:全局 UUID
└── data/                            ← DataDirManager 管辖
    ├── block_manager_instance       ← DirInstanceMetadataPB:目录集 UUID
    ├── <container-uuid>.data        ← LogBlockManager 容器数据文件
    └── <container-uuid>.metadata    ← LogBlockManager 容器元数据文件

WAL 根目录下有 wals/(按 tablet_id 分子目录存放 WAL 段文件)、tablet-meta/(tablet superblock)和 consensus-meta/(Raft 共识元数据)。

FsManager 在初始化阶段(Init())会对所有配置的根路径做规范化(canonicalize):解析符号链接、去除冗余分隔符,然后去重——如果 WAL 根和某个数据根恰好指向同一物理目录,只保留一份。如果某个路径无法规范化(例如磁盘已损坏),FsManager 会记录错误状态但不立即终止,后续打开时由 DataDirManager 决定是否仍有足够健康目录可用。

实例元数据与 UUID 体系

Kudu 有两层实例元数据,对应两种 protobuf 结构:

InstanceMetadataPB(全局实例文件)——每个根目录下一份 instance 文件,记录该节点的全局 UUID、加密密钥和租户信息。所有根目录的实例文件必须包含相同的 UUID;如果发现不一致,FsManager 拒绝启动并报错。这防止了误将两个不同节点的数据目录混合挂载的致命错误。

1
2
3
4
5
6
message InstanceMetadataPB {
  required bytes uuid = 1;           // 节点全局 UUID
  required string format_stamp = 2;  // 首次格式化时间戳
  optional string server_key = 3;    // 加密密钥(静态加密场景)
  repeated TenantMetadataPB tenants = 6; // 多租户信息
}

DirInstanceMetadataPB(目录实例文件)——每个数据目录下一份 block_manager_instance 文件,记录该目录自身的 UUID 以及整个目录集的 UUID 列表(DirSetPB)。这套机制使得 Kudu 能够检测目录的增减——如果新加了一块磁盘,其目录没有实例文件,DirManager 会为它创建新 UUID 并更新所有已有目录的 all_uuids 列表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
message DirSetPB {
  required bytes uuid = 1;          // 本目录 UUID
  repeated bytes all_uuids = 2;     // 全部目录 UUID 列表
}

message DirInstanceMetadataPB {
  required DirSetPB dir_set = 1;
  required string dir_type = 2;                   // "log"、"file" 或 "logr"
  optional uint64 filesystem_block_size_bytes = 3; // 文件系统块大小
}

UUID 设计的精妙之处在于它引入了位置无关的目录引用——在磁盘上的 DataDirGroupPB(记录一个 tablet 的数据分布在哪些目录上)中,UUID 代替了路径。这意味着磁盘的挂载点即使变化(例如 /data2 变成 /mnt/disk2),只要 UUID 文件还在,Kudu 就能正确识别。

Dir 与 DirManager 的两层体系

目录管理分为基类 DirManager 和子类 DataDirManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
classDiagram
  class Dir {
    env_ : Env*
    dir_ : string(完整路径)
    metadata_file_ : DirInstanceMetadataFile
    pool_ : ThreadPool(Per-Dir 线程池)
    is_full_ : bool
    available_bytes_ : int64_t
    +ExecClosure(task)
    +RefreshAvailableSpace(mode)
  }

  class DirManager {
    dirs_ : vector~unique_ptr Dir~
    dir_by_uuid_idx_ : map
    tablets_by_uuid_idx_map_ : map
    failed_dirs_ : set~int~
    +MarkDirFailed(uuid_idx)
    +FindDirByUuidIndex(idx)
  }

  class DataDirManager {
    group_by_tablet_map_ : tablet_id → DataDirGroup
    +CreateDataDirGroup(tablet_id)
    +GetDirAddIfNecessary(opts, dir)
    +DeleteDataDirGroup(tablet_id)
  }

  DirManager <|-- DataDirManager : 继承
  DirManager "1" *-- "*" Dir : dirs_

Dir 表示单个数据目录。每个 Dir 拥有独立的线程池(默认 FLAGS_fs_max_thread_count_per_data_dir = 8 个线程),用于异步执行 hole punch、临时文件清理等操作。Dir 还维护目录剩余空间的缓存(默认每 10 秒刷新一次),当可用空间低于保留阈值时标记为 is_full_,不再接受新 Block 的创建。保留阈值由 --fs_data_dirs_reserved_bytes 控制,默认为磁盘容量的 1%。

Dir 的文件系统类型也被检测并记录(FsType:EXT、XFS 或 OTHER),因为 Kudu 在 XFS 上会强制 fsync 共识元数据以避免数据损坏(KUDU-2195)。

DirManagerDir 的管辖者。它在启动时加载所有实例文件、校验 UUID 一致性、为缺失的目录创建新 UUID、标记不健康的目录。它维护了一系列索引映射:

1
2
3
4
5
uuid_idx → Dir*           通过 UUID 索引查找 Dir
Dir* → uuid_idx           反向查找
uuid_idx → set<tablet_id> 查找哪些 tablet 使用了该目录
uuid → uuid_idx           UUID 字符串到索引的转换
root_path → uuid          根路径到 UUID 的转换

DataDirManager 在此基础上增加了 DataDirGroup 概念和 Block 放置策略。

DataDirGroup:Tablet 的磁盘分组

每个 tablet 在创建时,DataDirManager 为其分配一个 DataDirGroup——一组数据目录的 UUID 索引列表。这个分组决定了该 tablet 的所有 Block 可以被放置到哪些磁盘上。

分组大小由 --fs_target_data_dirs_per_tablet(默认 3)控制。如果集群有 12 块数据盘,每个 tablet 默认只使用其中 3 块。这个设计基于一个重要的可用性考量:单盘故障时,只有部分 tablet 受影响,而不是全部——如果 Block 均匀分布在所有 12 块盘上,任何一块盘故障都会影响所有 tablet。

分组选择使用了 “The Power of Two Choices” 随机负载均衡算法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
void DataDirManager::GetDirsForGroupUnlocked(int target_size,
                                             vector<int>* group_indices) {
  // ...
  while (group_indices->size() < target_size && !candidate_indices.empty()) {
    shuffle(candidate_indices.begin(), candidate_indices.end(), ...);
    if (candidate_indices.size() == 1) {
      group_indices->push_back(candidate_indices[0]);
    } else {
      // 随机取两个候选,选 tablet 数更少的那个
      int tablets_in_first = tablets_by_uuid_idx_map_[candidate_indices[0]].size();
      int tablets_in_second = tablets_by_uuid_idx_map_[candidate_indices[1]].size();
      // 若 tablet 数相同,选剩余空间更大的
      selected_index = tablets_in_first < tablets_in_second ? 0 : 1;
      group_indices->push_back(candidate_indices[selected_index]);
    }
  }
}

每次随机挑两个候选目录,选择当前承载 tablet 更少的那个;若 tablet 数相同则选剩余空间更大的。这个算法在 O(1) 开销下实现了比纯随机更均匀的负载分布——与操作系统中 SQ(2) 调度策略的思路一致。

Block 放置:从 tablet_id 到物理磁盘

FsManager::CreateNewBlock(opts, &block) 被调用时,完整的路由链路是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
flowchart LR
  A["CreateNewBlock(tablet_id)"] --> B["block_manager(tenant_id)"]
  B --> C["BlockManager::CreateBlock(opts)"]
  C --> D["dd_manager->GetDirAddIfNecessary(opts)"]
  D --> E["GetDirForBlock(opts)"]
  E --> F{DataDirGroup 是否存在?}
  F -->|Yes| G["在 group 内选目录"]
  F -->|No| H["报错 NotFound"]
  G --> I{所有目录都满?}
  I -->|No| J["Power-of-Two-Choices 选一个"]
  I -->|Yes| K["尝试扩展 group"]
  K --> L["添加新目录到 group"]

GetDirForBlock 在 tablet 的 DataDirGroup 内选择目标目录时,会逐个检查目录健康状态和剩余空间。如果组内所有目录都满了(ENOSPC),GetDirAddIfNecessary 会尝试将新目录加入该 tablet 的 group——这是运行时的动态扩展,无需重启。

磁盘故障处理

Kudu 对磁盘故障采用标记-隔离策略,而非立即崩溃:

  1. 检测:BlockManager 的 I/O 操作返回 DiskFailure 时,通过 FsErrorManager 的回调通知 DataDirManager。
  2. 标记MarkDirFailed(uuid_idx) 将该目录加入 failed_dirs_ 集合。后续所有操作都会跳过该目录。
  3. 传播:上层(如 TabletServer)通过同一回调得知哪些 tablet 受影响,可以选择关闭这些 tablet 或触发 re-replication。
1
2
3
4
error_manager_->SetErrorNotificationCb(
    ErrorHandlerType::DISK_ERROR, [this](const string& uuid, const string& tenant_id) {
      this->dd_manager(tenant_id)->MarkDirFailedByUuid(uuid);
    });

只有当所有数据目录都故障时,AreAllDirsFailed() 返回 true,TabletServer 才会被迫停机。只要还有健康磁盘,服务就继续运行——只是那些数据完全位于故障盘上的 tablet 变得不可用。

这种设计选择本质上是 partial failure tolerance——与 RAID 控制器在单盘故障时继续服务的思路一致,但在软件层面以 tablet 粒度实现。每个 tablet 仅使用少量磁盘(默认 3 块),单盘故障只影响那些恰好使用了该盘的 tablet。

CODE: FsManager::Open() 的启动流程

FsManager::Open() 是系统启动时最关键的初始化步骤,串联了所有组件的打开。函数签名携带多个可选的进度追踪参数(Timeratomic<int> 计数器),供上层 UI 展示启动进度:

1
2
3
4
5
Status FsManager::Open(FsReport* report,
                       Timer* read_instance_metadata_files,
                       Timer* read_data_directories,
                       std::atomic<int>* containers_processed,
                       std::atomic<int>* containers_total);

完整流程分为八个阶段,下面逐一展开。

阶段 1:PartialOpen —— 加载实例元数据并验证 UUID。 这是最先执行的步骤,目的是在产生任何副作用之前尽早发现配置错误:

1
2
CanonicalizedRootsList missing_roots;
RETURN_NOT_OK(PartialOpen(&missing_roots));

PartialOpen 内部首先调用 Init() 完成路径规范化。Init() 对所有配置的根路径(WAL、数据、元数据)执行 Canonicalize——解析符号链接、去除冗余分隔符,然后去重。如果某个路径无法规范化(例如磁盘不可达),Init() 记录 DiskFailure 状态但不终止,只有 WAL 根和元数据根规范化失败才是致命错误:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Status FsManager::Init() {
  // ...
  for (const string& root : all_roots) {
    string canonicalized;
    Status s = GetEnv()->Canonicalize(DirName(root), &canonicalized);
    if (PREDICT_FALSE(!s.ok())) {
      if (s.IsNotFound() || s.IsDiskFailure()) {
        canonicalized = DirName(root);  // 保留原始路径,记录错误
      } else {
        return s.CloneAndPrepend(...);
      }
    }
    canonicalized = JoinPathSegments(canonicalized, BaseName(root));
    InsertOrDie(&canonicalized_roots, root, { canonicalized, s });
  }
  // WAL 根和元数据根必须成功
  RETURN_NOT_OK_PREPEND(canonicalized_wal_fs_root_.status, ...);
  RETURN_NOT_OK_PREPEND(canonicalized_metadata_fs_root_.status, ...);
  // ...
}

Init() 还处理元数据目录的选择逻辑:如果 --fs_metadata_dir 未指定,检查首个数据目录下是否已有 tablet-meta/ 子目录——如果有则使用数据目录,否则退回到 WAL 目录。这保证了升级兼容性。

路径规范化完成后,PartialOpen 遍历所有根目录,逐个读取 instance 文件(InstanceMetadataPB)。第一个成功读取的实例文件成为参考,后续每个实例文件的 UUID 必须与参考一致——如果发现不匹配,立即返回 Corruption 错误并报告具体的路径和 UUID 值,防止误将不同节点的数据目录混合挂载:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for (auto& root : canonicalized_all_fs_roots_) {
  if (!root.status.ok()) continue;
  unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
  Status s = pb_util::ReadPBContainerFromPath(GetEnv(),
      GetInstanceMetadataPath(root.path), pb.get(), pb_util::NOT_SENSITIVE);
  if (PREDICT_FALSE(!s.ok())) {
    if (s.IsNotFound()) {
      if (missing_roots) missing_roots->emplace_back(root);
      continue;   // 缺失的根目录记录下来,后续可能创建
    }
    if (s.IsDiskFailure()) {
      root.status = s.CloneAndPrepend("Failed to open instance file");
      continue;   // 磁盘故障的目录标记状态,不终止
    }
    return s;
  }
  if (!metadata_) {
    metadata_.reset(pb.release());            // 第一个作为参考
    reference_instance_path = root.path;
  } else if (pb->uuid() != metadata_->uuid()) {
    return Status::Corruption(Substitute(
        "Mismatched UUIDs across filesystem roots: "
        "$0 contains UUID $1 vs. $2 contains UUID $3",
        reference_instance_path, metadata_->uuid(), root.path, pb->uuid()));
  }
}
if (!metadata_) {
  return Status::NotFound("could not find a healthy instance file");
}

最后,PartialOpen 检测元数据目录的文件系统类型——是否在 XFS 上。这个信息影响后续的 fsync 行为:如果 --cmeta_fsync_override_on_xfs 为 true,XFS 检测失败将是致命错误;否则仅发出警告。这与 KUDU-2195(XFS 上某些情况下不 fsync 可能导致共识元数据损坏)有关。

阶段 2:验证辅助目录。 确认 WAL、tablet 元数据和共识元数据三个关键子目录存在且确实是目录(而非同名文件):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
vector<string> ancillary_dirs = { GetWalsRootDir(),
                                  GetTabletMetadataDir(),
                                  GetConsensusMetadataDir() };
for (const auto& d : ancillary_dirs) {
  bool is_dir;
  RETURN_NOT_OK_PREPEND(GetEnv()->IsDirectory(d, &is_dir),
                        Substitute("could not verify required directory $0", d));
  if (!is_dir) {
    return Status::Corruption(
        Substitute("Required directory $0 exists but is not a directory", d));
  }
}

阶段 3:ScopedCleanup 保护与创建缺失根目录。 从这里开始可能产生磁盘副作用(创建目录和文件),因此先注册一个 ScopedCleanup——如果后续任何步骤失败,已创建的文件和目录会被回滚删除:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
vector<string> created_dirs;
vector<string> created_files;
auto deleter = MakeScopedCleanup([&]() {
  // 先删文件,再逆序删目录(子目录在父目录之后添加,逆序删保证子先于父)
  for (const auto& f : created_files) {
    WARN_NOT_OK(GetEnv()->DeleteFile(f), "Could not delete file " + f);
  }
  for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) {
    WARN_NOT_OK(GetEnv()->DeleteDir(*it), "Could not delete dir " + *it);
  }
});

如果 PartialOpen 阶段收集到了 missing_roots(存在于配置中但磁盘上没有实例文件的根目录),且当前不是只读模式,则调用 CreateFileSystemRoots 为这些目录创建实例文件。这是在线扩盘的核心路径——运维添加新磁盘后只需修改 --fs_data_dirs 参数并重启,FsManager 会自动为新目录写入与现有 UUID 一致的 instance 文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if (!opts_.read_only &&
    opts_.update_instances != UpdateInstanceBehavior::DONT_UPDATE) {
  Status s = CreateFileSystemRoots(
      missing_roots, *metadata_, &created_dirs, &created_files);
  if (opts_.update_instances == UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES) {
    WARN_NOT_OK(s, kUnableToCreateMsg);   // 仅警告,尝试继续
  } else if (opts_.update_instances == UpdateInstanceBehavior::UPDATE_AND_ERROR_ON_FAILURE) {
    RETURN_NOT_OK_PREPEND(s, kUnableToCreateMsg);  // 失败则终止
  }
}

CreateFileSystemRoots 内部对每个缺失根目录做两件事:(1)如果目录不存在则 CreateDirIfMissing,(2)写入与已有节点相同 UUID 的 InstanceMetadataPB。写入前会检查目录是否为空——如果非空则拒绝创建,防止误覆盖已有数据。

UpdateInstanceBehavior 枚举控制了错误处理策略:UPDATE_AND_IGNORE_FAILURES 在创建失败时仅发出警告并继续尝试打开——如果已有健康目录足够,服务仍可启动;UPDATE_AND_ERROR_ON_FAILURE 则严格要求所有目录可用。

阶段 4:解密密钥处理。 如果启用了静态加密(--encrypt_data_at_rest),需要通过 KeyProvider 解密存储在实例文件中的密钥。这里有多租户和单租户两条路径:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
string decrypted_key;
bool tenants_exist = is_tenants_exist();
if (tenants_exist && key_provider_) {
  // 多租户模式:tenant key 优先级高于 server key
  if (!FLAGS_enable_multi_tenancy) {
    return Status::IllegalState(
        "The '--enable_multi_tenancy' should set for the existed tenants.");
  }
  RETURN_NOT_OK(key_provider_->DecryptEncryptionKey(
      this->tenant_key(fs::kDefaultTenantID),
      this->tenant_key_iv(fs::kDefaultTenantID),
      this->tenant_key_version(fs::kDefaultTenantID),
      &decrypted_key));
} else if (!server_key().empty() && key_provider_) {
  // 单租户模式:使用 server key
  if (FLAGS_enable_multi_tenancy) {
    return Status::IllegalState(
        "--enable_multi_tenancy is set, but no tenants exist.");
  }
  RETURN_NOT_OK(key_provider_->DecryptEncryptionKey(
      this->server_key(), this->server_key_iv(),
      this->server_key_version(), &decrypted_key));
} else if (server_key().empty() && FLAGS_encrypt_data_at_rest) {
  return Status::IllegalState(
      "--encrypt_data_at_rest is set, but no server key found.");
}

解密后,将密钥设置到 Env 层,后续所有文件 I/O 自动进行加解密:

1
2
3
4
5
if (!decrypted_key.empty()) {
  GetEnv()->SetEncryptionKey(
      reinterpret_cast<const uint8_t*>(a2b_hex(decrypted_key).c_str()),
      decrypted_key.length() * 4);  // hex/2=bytes, bytes*8=bits
}

阶段 5:打开 DataDirManager。 委托 DataDirManager::OpenExisting 加载所有数据目录的实例文件、校验目录集 UUID 一致性、建立 UUID ↔ 目录的双向索引:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Status FsManager::OpenDataDirManager(const string& tenant_id) {
  if (!dd_manager(tenant_id)) {
    scoped_refptr<DataDirManager> ddm = nullptr;
    DataDirManagerOptions dm_opts;
    dm_opts.metric_entity = opts_.metric_entity;
    dm_opts.read_only = opts_.read_only;
    dm_opts.dir_type = opts_.block_manager_type;
    dm_opts.update_instances = opts_.update_instances;
    dm_opts.tenant_id = tenant_id;
    LOG_TIMING(INFO, "opening directory manager") {
      RETURN_NOT_OK(DataDirManager::OpenExisting(GetEnv(tenant_id),
          get_canonicalized_data_fs_roots(tenant_id), dm_opts, &ddm));
    }
    RETURN_NOT_OK(AddDataDirManager(ddm, tenant_id));
  }
  return Status::OK();
}

dm_opts.dir_type 传入了 block_manager_type"log""logr""file"),DataDirManager 据此决定目录实例文件中记录的类型标记——不同类型的 BlockManager 不能混用同一组数据目录。

阶段 6:清理临时文件与检查权限。 只在非只读模式下执行,且必须在 DataDirManager 成功打开之后——因为打开过程中会获取目录的排他锁(exclusive lock),确保只有一个进程在操作这些目录。在获得锁之后再删除临时文件是安全的:

1
2
3
4
if (!opts_.read_only) {
  CleanTmpFiles();
  CheckAndFixPermissions();
}

CleanTmpFiles 递归删除 WAL、tablet-meta、consensus-meta 目录下的 .tmp 文件——这些是上次运行中未完成的临时写入。BlockManager 目录下的临时文件由 BlockManager 自己在启动阶段清理,不在这里处理。

CheckAndFixPermissions 遍历所有根目录,调用 EnsureFileModeAdheresToUmask 检查并修正文件权限——确保目录权限符合当前 umask 设置,避免因权限过宽导致安全风险。

阶段 7:设置磁盘错误回调。 注册全局的磁盘错误处理器,将 BlockManager 运行时遇到的 I/O 错误传导到 DataDirManager 的目录标记机制:

1
2
3
4
5
error_manager_->SetErrorNotificationCb(
    ErrorHandlerType::DISK_ERROR,
    [this](const string& uuid, const string& tenant_id) {
      this->dd_manager(tenant_id)->MarkDirFailedByUuid(uuid);
    });

这是"标记-隔离"故障处理策略的连线点——BlockManager 层检测到磁盘错误后通过 FsErrorManager 发出通知,回调函数调用 MarkDirFailedByUuid 将对应目录加入 failed_dirs_ 集合,后续所有操作跳过该目录。

阶段 8:初始化并打开 BlockManager。 这是整个启动流程中最耗时的阶段。如果 opts_.skip_block_manager 为 true(某些工具模式不需要 Block 访问能力),则跳过此步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if (!opts_.skip_block_manager) {
  if (read_data_directories) read_data_directories->Start();
  RETURN_NOT_OK(InitAndOpenBlockManager(report,
      containers_processed, containers_total,
      fs::kDefaultTenantID, BlockManager::MergeReport::REQUIRED));
  if (read_data_directories) {
    read_data_directories->Stop();
    if (opts_.metric_entity && FsManager::IsLogType(opts_.block_manager_type)) {
      METRIC_log_block_manager_containers_processing_time_startup.Instantiate(
          opts_.metric_entity, read_data_directories->TimeElapsed().ToMilliseconds());
    }
  }
}

InitBlockManager 根据 block_manager_type 配置创建具体的 BlockManager 实现——"log" 对应 LogBlockManagerNativeMeta"logr" 对应 LogBlockManagerRdbMeta(RocksDB 存储元数据的变体)、"file" 对应 FileBlockManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
scoped_refptr<BlockManager> FsManager::InitBlockManager(const string& tenant_id) {
  // ...
  if (opts_.block_manager_type == "log") {
    block_manager.reset(new LogBlockManagerNativeMeta(
        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
        opts_.file_cache, std::move(bm_opts), tenant_id));
  } else if (opts_.block_manager_type == "logr") {
    block_manager.reset(new LogBlockManagerRdbMeta(
        GetEnv(tenant_id), dd_manager(tenant_id), error_manager_,
        opts_.file_cache, std::move(bm_opts), tenant_id));
  } else if (opts_.block_manager_type == "file") {
    block_manager.reset(new FileBlockManager(...));
  }
  CHECK_OK(block_manager->Start());
  // 注册到 block_manager_map_
}

随后调用 block_manager->Open(report, ...)——对于 LogBlockManager,这一步会并行打开所有数据目录下的 container,逐条回放 .metadata 文件重建内存索引,是启动时间的主要瓶颈(详见前文"启动恢复"一节)。containers_processedcontainers_total 两个原子计数器在回放过程中被递增,上层可以据此显示进度百分比。启动耗时被记录到 log_block_manager_containers_processing_time_startup metric 中。

收尾:同步与日志。 所有组件成功打开后,如果 --enable_data_block_fsync 为 true,对所有新创建的目录和文件的父目录执行 fsync,确保目录条目持久化:

1
2
3
4
if (FLAGS_enable_data_block_fsync) {
  WARN_NOT_OK(env_util::SyncAllParentDirs(GetEnv(), created_dirs, created_files),
              "could not sync newly created fs roots");
}

最后打印成功日志,列出所有根目录和实例元数据摘要,并取消 ScopedCleanup——不再回滚任何已创建的内容:

1
2
3
4
LOG(INFO) << "Opened local filesystem: " <<
    JoinStrings(DataDirManager::GetRootNames(canonicalized_all_fs_roots_), ",")
    << std::endl << SecureDebugString(*metadata_);
deleter.cancel();

崩溃安全分析。 整个 Open() 流程的崩溃安全由 ScopedCleanup 和操作顺序共同保证:

  • 阶段 1-2 是纯读操作,崩溃无影响。
  • 阶段 3 创建的新目录和实例文件被 created_dirs/created_files 跟踪——如果后续步骤失败,ScopedCleanup 将它们全部删除。文件先于目录删除,目录逆序删除(子目录先于父目录),保证清理的正确性。
  • 阶段 4 的密钥解密是内存操作,崩溃无持久化影响。
  • 阶段 5-8 的打开操作本身是幂等的——重启后可以安全地重新执行。
  • 如果进程在 deleter.cancel() 之前崩溃、但阶段 3 已创建了新目录,这些目录会遗留在磁盘上。不过由于它们包含正确的 instance 文件和正确的 UUID,下次启动时会被正常识别——不会造成数据不一致。

FileCache 文件描述符管理

操作系统的文件描述符(fd)是稀缺资源——ulimit -n 通常限制在几千到几万。一个生产环境的 Kudu TabletServer 可能管理数百个 tablet,每个 tablet 有几十个 CFile(每列一个),再加上 LogBlockManager 的 container 文件、WAL 段文件……轻易就能耗尽 fd 上限。

FileCache 解决这个问题的方法是:对上层透明地管理 fd 的打开/关闭,用一个 LRU 缓存限制同时打开的 fd 数上界。被淘汰的文件在下次使用时自动重新打开——上层代码完全感知不到文件曾经被关闭过。

两层架构:Descriptor Map + LRU Cache

FileCache 内部由两层构成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
flowchart TB
  subgraph 上层API
    A["OpenFile(filename)"] --> B["shared_ptr&lt;Descriptor&gt;"]
  end
  subgraph Descriptor Map
    C["rwf_descs_: filename → weak_ptr&lt;Descriptor&lt;RWFile&gt;&gt;"]
    D["raf_descs_: filename → weak_ptr&lt;Descriptor&lt;RandomAccessFile&gt;&gt;"]
  end
  subgraph LRU Cache
    E["filename → File* (capacity = max_open_files)"]
  end
  B --> C
  B --> D
  C -.->|"LookupFromCache()"| E
  D -.->|"LookupFromCache()"| E

Descriptor Map(上层)是一个 unordered_map<string, weak_ptr<Descriptor>>,按文件名索引 Descriptor 对象。由于使用 weak_ptr,map 中的条目不影响 Descriptor 的生命周期。Descriptor 是一个实现了 RWFileRandomAccessFile 接口的包装器——上层代码拿到的就是这个对象,它看起来就像一个普通的文件句柄。

LRU Cache(下层)是一个容量受限的标准 LRU 缓存(Cache),key 是文件名,value 是真实的文件对象指针(RWFile*RandomAccessFile*)。容量就是 max_open_files——当缓存满了再插入新文件时,最久未使用的文件会被驱逐(evict),其 fd 在驱逐回调 EvictionCallback 中被关闭和 delete。

透明重新打开

Descriptor 的每一个 I/O 方法(Read、Write、Sync 等)都遵循同一模式——先从 LRU 缓存查找,命中则直接操作,未命中则重新打开文件再操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
template <Env::OpenMode Mode>
Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
  ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
  if (found.opened()) {
    // Cache 命中:文件仍然打开,直接使用
    if (out) { *out = std::move(found); }
    return Status::OK();
  }

  // Cache 未命中:文件已被驱逐关闭,重新打开
  RWFileOptions opts;
  opts.mode = Mode;
  unique_ptr<RWFile> f;
  RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));

  // 插入 cache(可能驱逐另一个文件)
  ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
  if (out) { *out = std::move(opened); }
  return Status::OK();
}

ScopedOpenedDescriptor 是一个 RAII 包装,持有 LRU cache handle——只要这个对象存在,对应的 cache entry 就不会被驱逐。一旦 I/O 操作完成、ScopedOpenedDescriptor 析构,handle 被释放,文件重新进入 LRU 淘汰候选。

这意味着:一次 preadv 调用期间文件不会被关闭(因为 handle 被持有),但两次 I/O 之间文件可能被驱逐。对于 Block 式的访问模式(打开、读几次、关闭),这种行为是理想的——热文件留在缓存中,冷文件让出 fd 给其他人。

Descriptor 的生命周期

Descriptor 的生命周期由 shared_ptr 管理,不根据 LRU Cache 的状态:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
OpenFile("a.data")
  ├── 在 Descriptor Map 中查找
  │   ├── 找到且 weak_ptr 可 lock → 返回已有 Descriptor
  │   └── 未找到或已过期 → 创建新 Descriptor,插入 map
  └── 返回 shared_ptr<Descriptor> 给调用者
      └── 调用者持有 shared_ptr 期间:Descriptor 存活
          └── 最后一个 shared_ptr 析构时:
              ├── ~BaseDescriptor:从 LRU cache 中 Erase
              ├── 如果标记了删除 → 真正 DeleteFile
              └── Descriptor Map 中 weak_ptr 过期,留待清理

两个关键设计选择:

  1. Descriptor 析构时主动驱逐。~BaseDescriptor 中调用 cache()->Erase(filename())——即使文件在 LRU 中还有余温,只要所有上层引用都释放了,fd 就立刻关闭。注释解释了这个选择:用户通常期望"用完文件后资源被释放”,而非"fd 留在缓存等着被别人驱逐"。

  2. 延迟删除语义DeleteFile 不立刻删除物理文件,而是在 Descriptor 上标记 FILE_DELETED。只有当最后一个 shared_ptr 释放、Descriptor 析构时,文件才真正被删除。这与 Unix 中 unlink() 后文件直到最后一个 fd 关闭才释放空间的语义一致。

后台清理线程

FileCache 启动一个后台线程 RunDescriptorExpiry,每隔 FLAGS_file_cache_expiry_period_ms(默认 60 秒)扫描一次 Descriptor Map,移除所有 weak_ptr 已过期(expired())的条目。这是必要的——Descriptor 析构时不直接从 map 中移除自己(避免递归持锁导致死锁),而是留下过期的 weak_ptr,由后台线程统一清理。

1
2
3
4
5
6
7
8
void FileCache::RunDescriptorExpiry() {
  while (!running_.WaitFor(MonoDelta::FromMilliseconds(
      FLAGS_file_cache_expiry_period_ms))) {
    std::lock_guard l(lock_);
    ExpireDescriptorsFromMap(&rwf_descs_);
    ExpireDescriptorsFromMap(&raf_descs_);
  }
}

Invalidate:处理文件替换

Invalidate(filename) 用于"rename-to-replace"模式——先写临时文件,再原子 rename 覆盖旧文件,最后调用 Invalidate 让缓存感知变化:

1
2
3
WriteNewDataTo(tmp_path);
env->RenameFile(tmp_path, path);
file_cache->Invalidate(path);

Invalidate 做三件事:(1)在两个 Descriptor Map 中插入标记了 INVALIDATED 的占位 Descriptor,阻止并发 open 返回旧内容;(2)从 LRU Cache 中 Erase 同名条目;(3)移除这些占位 Descriptor。此后再打开同一路径,FileCache 会从磁盘重新打开(此时已是 rename 后的新文件)。

LogBlockManager 中 metadata compact 的 RewriteMetadataFile 正是使用了这个机制——compact 写入临时文件后 rename 覆盖旧 .metadata,再 Invalidate 让 FileCache 刷新。

与 BlockManager 的集成

FileCache 通过 FsManagerOpts::file_cache 传递给 BlockManager。在 LogBlockManager 中,container 的 .data 文件通过 FileCache 打开,得到的是 shared_ptr<RWFile> 而非裸 unique_ptr<RWFile>——这个 shared_ptr 指向的其实是 FileCache 内部的 Descriptor 对象。

1
2
3
// LogBlockContainer 打开 .data 文件(简化)
shared_ptr<RWFile> data_file;
RETURN_NOT_OK(file_cache->OpenFile<Env::MUST_EXIST>(data_path, &data_file));

这样做的效果是:即使集群上有数万个 container(意味着数万个 .data 文件),同时打开的 fd 数量也被 FileCache 限制在可控范围内。不活跃的 container 的 fd 会被 LRU 驱逐,下次读写时透明重新打开。对于 LogBlockManager 来说,主要的文件操作(preadvpwritevfallocatefdatasync)都是无状态的(不依赖文件位置指针),所以关闭再重新打开不会影响正确性。

LRU 容量的权衡

LRU cache 的容量(max_open_files)就是同时打开的 fd 上限。它的每个 entry 的 charge 被硬编码为 1 字节——这不是真实的内存开销追踪,而是让 cache 的容量(以"字节"计)等价于 fd 数量上限。

容量过小的代价是频繁的 reopen——open() 系统调用本身有显著开销(内核需要路径解析、权限检查、分配 fd 等),高频率的 open/close 循环会拖慢 I/O 吞吐。容量过大则可能逼近 ulimit 上限,留给其他组件(日志、网络连接、RPC)的 fd 不足。

实际部署中,Kudu 的 tserver 默认将 max_open_files 设为系统 ulimit -n 上限的一定比例(由上层 TabletServer 配置传入),为非文件 I/O 的 fd 需求留出余量。

线程安全

FileCache 的所有公开方法都是线程安全的。Descriptor Map 由 simple_spinlock 保护(轻量级自旋锁,适合短临界区)。LRU Cache 自身也是线程安全的。一个重要的约束是同一文件名在任意时刻最多只能有一个活跃 Descriptor——在 DEBUG 模式下,OpenFile 会检查同一文件是否已在另一种类型的 map 中存在,违反则 CHECK 失败。这防止了同一文件同时被当作 RWFile 和 RandomAccessFile 打开的混乱。

附录:相关源码文件索引

文件
src/kudu/fs/block_manager.hBlock、WritableBlock、ReadableBlock、BlockManager 接口定义
src/kudu/fs/block_manager.ccPre-flush 控制 gflag、BlockManager 工厂方法
src/kudu/fs/log_block_manager.hLogBlockManager、LogBlockManagerNativeMeta 类声明
src/kudu/fs/log_block_manager.ccLogBlockContainer、LogBlock、LogWritableBlock、LogReadableBlock 实现
src/kudu/fs/fs.protoBlockRecordPB、BlockIdPB 定义
src/kudu/fs/block_manager_metrics.hBlock Manager 指标定义
src/kudu/fs/block_manager-test.ccBlock Manager 测试
src/kudu/fs/log_block_manager-test.ccLogBlockManager 测试