Trading More Storage for Less Computation — A KVCache-centric Architecture for Serving LLM Chatbot

本文基于 FAST'25 论文及开源代码库,深入解析 Mooncake 的设计原理、核心实现与调试方法。


目录#


第一部分:概述与背景#

第 1 章:引言与项目背景#

1.1 LLM 推理服务的挑战#

随着 ChatGPT、Kimi、Claude 等大语言模型(Large Language Model, LLM)服务的爆发式增长,LLM 推理服务(LLM Serving)已成为 AI 基础设施领域最具挑战性的工程问题之一。与传统的 Web 服务不同,LLM 推理服务面临着独特的技术挑战:

1.1.1 计算与存储的双重压力#

LLM 推理的核心瓶颈在于 KVCache(Key-Value Cache)。在 Transformer 架构中,每次生成新 token 时,模型需要访问之前所有 token 的 Key 和 Value 向量。以 LLaMA3-70B 模型为例:

单个 token 的 KVCache 大小 = 层数 × 2 × (维度 / GQA比例) × 数据类型大小
                        = 80 × 2 × (8192 / 8) × 2 bytes
                        = 320 KB

对于一个 128K 上下文长度的请求,KVCache 大小将达到:

128K tokens × 320 KB/token = 40 GB

这意味着单个长上下文请求的 KVCache 就可能占满一张 80GB 显存的 A800 GPU 的一半空间。

1.1.2 SLO 约束:TTFT 与 TBT#

LLM 服务商需要满足严格的服务等级目标(Service Level Objectives, SLOs),主要包括两个关键指标:

指标 全称 含义 典型阈值
TTFT Time To First Token 从请求到达到生成第一个 token 的时间 < 30 秒
TBT Time Between Tokens 连续 token 生成之间的间隔时间 < 100-300 毫秒
sequenceDiagram
    participant User as 用户
    participant Server as LLM 服务器

    User->>Server: 发送请求 (包含 prompt)
    Note over Server: Prefill 阶段<br/>处理所有输入 token
    Server-->>User: 第一个 token
    Note right of User: TTFT = 从请求到第一个 token 的时间

    loop 自回归生成
        Note over Server: Decode 阶段<br/>生成单个 token
        Server-->>User: 下一个 token
        Note right of User: TBT = token 间隔时间
    end

TTFT 主要受 Prefill 阶段影响,该阶段需要处理所有输入 token,是计算密集型任务。对于长上下文(如 128K tokens),Prefill 可能需要数十秒。

TBT 主要受 Decode 阶段影响,该阶段每次只生成一个 token,但需要访问完整的 KVCache,是内存密集型任务。

1.1.3 资源利用率的困境#

传统的 LLM 推理架构将 Prefill 和 Decode 阶段耦合在同一节点上运行,这带来了严重的资源利用率问题:

graph LR
    subgraph "传统耦合架构的问题"
        A[长 Prefill 请求] --> B[GPU]
        C[多个 Decode 请求] --> B
        B --> D[Prefill 阻塞 Decode]
        D --> E[TBT 超时]
    end

问题一:Prefill 干扰 Decode

当一个长上下文请求进入 Prefill 阶段时,GPU 被完全占用,导致正在进行 Decode 的请求无法获得计算资源,TBT 急剧上升。

问题二:KVCache 本地化限制

传统系统将 KVCache 存储在本地 GPU 显存或 CPU 内存中:

  • GPU 显存容量有限(80GB),只能缓存少量请求的 KVCache
  • CPU 内存虽然更大,但缓存命中率受限于单节点容量
  • 相同前缀的请求可能被调度到不同节点,无法共享 KVCache

问题三:资源浪费

GPU 服务器配备了大量的 CPU、DRAM、SSD 和高速网卡资源,但在传统架构中这些资源严重未被利用:

资源 典型配置 (8×A800 节点) 传统利用率
GPU 显存 8 × 80GB = 640GB
CPU 内存 1-2 TB
NVMe SSD 数 TB 极低
RDMA 网卡 8 × 200Gbps

1.2 Mooncake 的核心理念#

1.2.1 “Trading More Storage for Less Computation”#

Mooncake 的核心设计理念是:用更多的存储换取更少的计算

这一理念基于一个关键洞察:KVCache 的复用可以显著减少 Prefill 阶段的计算量

考虑以下场景:

  • 用户 A 发送请求:“请分析这份 50 页的 PDF 文档…”
  • 用户 B 发送相同请求:“请分析这份 50 页的 PDF 文档…"(相同前缀)

在传统系统中,两个请求都需要完整执行 Prefill,计算量翻倍。而如果能够缓存并复用用户 A 生成的 KVCache,用户 B 的请求可以直接跳过已缓存前缀的计算。

数学分析

给定输入长度 n,Prefill 阶段的计算量(FLOPs)为:

$$flops(n) = l \times (an^2d + bnd^2)$$

其中:

  • $l$ = 层数 (LLaMA3-70B: 80)
  • $d$ = 模型维度 (LLaMA3-70B: 8192)
  • $a, b$ = 常数系数 (约 4, 22)

如果请求的 prompt 长度为 n,其中有 p 个 token 的前缀可以从缓存中复用,则:

计算节省量 ≈ $l \times (ap^2d + bpd^2)$

需要传输的 KVCache 大小 = $p \times l \times (2 \times d / gqa) \times s$

其中 $gqa$ 是 GQA(Grouped Query Attention)的比例,$s$ 是数据类型大小。

复用 KVCache 有利于改善 TTFT 的条件是:

$$\frac{B}{G} > \frac{2ds}{gqa \times (apd + bd^2)}$$

其中 $B$ 是 KVCache 加载带宽,$G$ 是 GPU 计算吞吐量。

关键结论:对于 LLaMA3-70B 在 8×A800 上,当前缀长度为 8192 时,只需要约 6 GB/s 的带宽就能使 KVCache 复用有利可图。对于 8×H800,这一阈值约为 19 GB/s。而现代 RDMA 网络可以轻松提供 100+ GB/s 的聚合带宽。

1.2.2 KVCache-centric 架构#

基于上述洞察,Mooncake 提出了 KVCache 中心化(KVCache-centric) 的架构设计:

graph TB
    subgraph "传统架构"
        direction TB
        R1[请求] --> N1[节点 1: Prefill + Decode]
        R2[请求] --> N2[节点 2: Prefill + Decode]
        N1 --> LC1[本地 Cache]
        N2 --> LC2[本地 Cache]
    end

    subgraph "Mooncake 架构"
        direction TB
        R3[请求] --> C[Conductor<br/>全局调度器]
        C --> P1[Prefill 节点 1]
        C --> P2[Prefill 节点 2]
        C --> D1[Decode 节点 1]
        C --> D2[Decode 节点 2]
        P1 <--> GC[Mooncake Store<br/>分布式全局 Cache]
        P2 <--> GC
        D1 <--> GC
        D2 <--> GC
    end

核心设计原则

  1. Prefill/Decode 分离(P/D Disaggregation)

    • Prefill 节点专注于处理输入,优化 TTFT
    • Decode 节点专注于生成输出,优化 TBT
    • 两类节点可以独立扩缩容
  2. 分布式 KVCache 池(Mooncake Store)

    • 利用集群中所有节点的 CPU 内存和 SSD 构建全局缓存池
    • 通过高速 RDMA 网络实现跨节点 KVCache 共享
    • 支持 PB 级别的缓存容量
  3. KVCache 中心化调度(KVCache-centric Scheduling)

    • 调度决策以 KVCache 的分布和复用为核心
    • 将请求调度到能复用最多 KVCache 的节点
    • 自动进行热点 KVCache 的迁移和复制

1.3 论文核心贡献与性能数据#

1.3.1 主要贡献#

Mooncake 论文(FAST'25)的核心贡献包括:

  1. 首个大规模部署的分布式 KVCache 系统

    • 在 Kimi 服务中部署于数千节点
    • 每日处理超过 1000 亿 tokens
  2. 高性能 Transfer Engine

    • 支持 8×400 Gbps RDMA 网络
    • 传输速度比 TCP 快 2.4×-4.6×
    • 拓扑感知的路径选择
  3. KVCache 中心化调度算法

    • Cache-aware 的全局调度
    • 启发式热点迁移策略
    • 无需精确预测的自动副本管理
  4. Chunked Pipeline Parallelism (CPP)

    • 针对长上下文的跨节点并行方案
    • 比 Sequence Parallelism 更低的通信开销

1.3.2 性能对比数据#

与 vLLM 基准系统相比,Mooncake 在不同 TBT SLO 阈值下的有效请求容量提升:

TBT 阈值 提升幅度 场景
100 ms +498% 严格 SLO(实时对话)
200 ms +157% 中等 SLO
300 ms +59% 宽松 SLO
有效请求容量对比 (16×8×A800 节点,真实对话工作负载)

TBT SLO: 100ms
Mooncake:     ████████████████████████████████████████ 100%
vLLM:         ██████                                    17%

TBT SLO: 200ms
Mooncake:     ████████████████████████████████████████ 100%
vLLM:         ███████████████                           39%

TBT SLO: 300ms
Mooncake:     ████████████████████████████████████████ 100%
vLLM:         █████████████████████████                 63%

1.3.3 生产环境效果#

在 Kimi 的实际生产部署中:

集群类型 相比之前系统的提升
A800 集群 +115% 请求处理能力
H800 集群 +107% 请求处理能力

Cache 命中率对比

缓存策略 命中率 Prefill 计算时间节省
本地 Cache ~50% 理论最大值 -
Mooncake Store 2.36× 本地 Cache 最高 48%

1.4 开源项目概览#

Mooncake 已在 GitHub 开源:https://github.com/kvcache-ai/Mooncake

1.4.1 代码库结构#

Mooncake/
├── mooncake-transfer-engine/    # 核心传输引擎
│   ├── include/                 # 头文件
│   │   ├── transfer_engine.h    # 主 API
│   │   ├── transport/           # 传输协议
│   │   │   ├── rdma_transport/  # RDMA 实现
│   │   │   ├── tcp_transport/   # TCP 实现
│   │   │   └── ...
│   │   └── topology.h           # 拓扑发现
│   └── src/                     # 实现代码
│
├── mooncake-store/              # 分布式 KVCache 存储
│   ├── include/
│   │   ├── storage_backend.h    # 存储后端
│   │   ├── eviction_strategy.h  # 驱逐策略
│   │   └── master_service.h     # Master 服务
│   └── src/
│
├── mooncake-integration/        # 与 LLM 框架集成
│   ├── vllm/                    # vLLM 集成
│   └── sglang/                  # SGLang 集成
│
├── mooncake-p2p-store/          # P2P 对象存储
├── mooncake-ep/                 # Expert Parallelism 支持
├── mooncake-wheel/              # Python 包装和 CLI
└── docs/                        # 文档

1.4.2 支持的硬件和协议#

类别 支持列表
GPU NVIDIA (CUDA)、AMD (HIP)、华为昇腾 (Ascend)
网络 InfiniBand、RoCE、eRDMA
传输协议 RDMA、TCP、NVMe-oF、NVLink、CXL
存储 GPU VRAM、CPU DRAM、NVMe SSD

1.4.3 集成支持#

Mooncake 已与主流 LLM 推理框架深度集成:

  • vLLM: v0.2 / v1 版本支持
  • SGLang: HiCache 集成
  • LMDeploy: 适配中
  • LMCache: 适配中

1.5 本文结构#

本文将从以下七个部分深入解析 Mooncake:

部分 内容 重点
第一部分 概述与背景 问题定义、设计理念
第二部分 系统架构 整体设计、请求流程
第三部分 Mooncake Store 存储设计、Transfer Engine
第四部分 调度系统 调度算法、负载均衡
第五部分 性能优化 优化技术、部署调试
第六部分 代码解析 核心代码逐行分析
第七部分 总结展望 贡献回顾、未来方向

第 2 章:技术预备知识#

在深入 Mooncake 的设计细节之前,本章将介绍必要的技术背景知识,包括 Transformer 架构、KVCache 原理、LLM 推理的两阶段特性,以及 RDMA 网络基础。

2.1 Transformer 架构与 KVCache 原理#

2.1.1 Transformer 的 Attention 机制回顾#

现代大语言模型(如 GPT、LLaMA、Kimi)都基于 Transformer 架构,其核心是 自注意力机制(Self-Attention)

对于输入序列 $X \in \mathbb{R}^{n \times d}$(n 个 token,每个 token 的维度为 d),自注意力的计算过程如下:

graph LR
    X[输入 X] --> Q[Q = X·W_Q]
    X --> K[K = X·W_K]
    X --> V[V = X·W_V]
    Q --> Attn[Attention]
    K --> Attn
    V --> Attn
    Attn --> O[输出 O]

计算公式

$$Q = XW_Q, \quad K = XW_K, \quad V = XW_V$$

$$\text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V$$

其中:

  • $W_Q, W_K, W_V$ 是可学习的投影矩阵
  • $d_k$ 是 Key 向量的维度
  • softmax 操作确保注意力权重归一化

2.1.2 自回归生成与 KVCache#

LLM 采用**自回归(Autoregressive)**方式生成文本:每次只生成一个 token,新 token 依赖于之前所有 token 的信息。

朴素实现的问题

生成第 n 个 token 时:
  需要计算 Q_n = x_n · W_Q  (只需当前 token)
  需要计算 K_{1:n} = X_{1:n} · W_K  (需要所有历史 token)
  需要计算 V_{1:n} = X_{1:n} · W_V  (需要所有历史 token)

每生成一个新 token,都需要重新计算所有历史 token 的 K 和 V,造成大量重复计算。

KVCache 优化

将历史 token 的 K 和 V 缓存起来,生成新 token 时只需:

  1. 计算新 token 的 $q_n, k_n, v_n$
  2. 将 $k_n, v_n$ 追加到 KVCache
  3. 使用 $q_n$ 与完整的 $K_{1:n}, V_{1:n}$ 计算注意力
graph TB
    subgraph "Prefill 阶段"
        P1[输入 tokens X_{1:m}] --> P2[计算 K_{1:m}, V_{1:m}]
        P2 --> P3[存入 KVCache]
        P2 --> P4[计算 Attention]
        P4 --> P5[生成第一个 output token]
    end

    subgraph "Decode 阶段 (循环)"
        D1[新 token x_n] --> D2[计算 k_n, v_n]
        D2 --> D3[追加到 KVCache]
        D3 --> D4[K_{1:n}, V_{1:n}]
        D1 --> D5[计算 q_n]
        D5 --> D6[Attention with KVCache]
        D4 --> D6
        D6 --> D7[生成下一个 token]
    end

    P5 --> D1
    D7 -.-> D1

2.1.3 KVCache 的内存占用分析#

对于一个典型的 LLM(以 LLaMA3-70B 为例):

参数 符号 LLaMA3-70B
层数 $l$ 80
模型维度 $d$ 8192
注意力头数 $n_{head}$ 64
KV 头数 (GQA) $n_{kv}$ 8
数据类型 - BFloat16 (2 bytes)

单个 token 的 KVCache 大小

$$\text{KVCache per token} = l \times 2 \times \frac{d}{n_{head}/n_{kv}} \times \text{sizeof(dtype)}$$

$$= 80 \times 2 \times \frac{8192}{64/8} \times 2 = 80 \times 2 \times 1024 \times 2 = 327,680 \text{ bytes} \approx 320 \text{ KB}$$

不同上下文长度的 KVCache 总大小

上下文长度 KVCache 大小
4K tokens 1.28 GB
32K tokens 10.24 GB
128K tokens 40.96 GB
1M tokens 320 GB

这解释了为什么长上下文推理对内存需求如此之高。

2.1.4 Prefix Caching 的原理#

关键洞察:自回归语言模型具有前缀无关性——相同前缀的 KVCache 完全相同,与后续内容无关。

请求 A: "请分析这份报告:[报告内容]... 给出总结"
请求 B: "请分析这份报告:[报告内容]... 给出要点"

共同前缀: "请分析这份报告:[报告内容]..."

如果请求 A 先执行,其前缀部分的 KVCache 可以被缓存。当请求 B 到达时,可以直接复用这部分 KVCache,只需计算不同后缀部分。

Prefix Caching 的工作流程

sequenceDiagram
    participant R1 as 请求 A
    participant Cache as KVCache 缓存
    participant GPU as GPU
    participant R2 as 请求 B

    R1->>GPU: 发送完整 prompt
    GPU->>GPU: Prefill 计算
    GPU->>Cache: 存储 KVCache (hash: prefix_A)
    GPU-->>R1: 返回输出

    Note over R2,Cache: 请求 B 到达,前缀与 A 相同

    R2->>Cache: 查询 prefix hash
    Cache-->>R2: 命中!返回 KVCache 位置
    R2->>GPU: 只发送不同的后缀
    Cache->>GPU: 加载缓存的 KVCache
    GPU->>GPU: 只计算后缀部分
    GPU-->>R2: 返回输出

2.2 LLM 推理的两阶段特性#

LLM 推理可以明确分为两个阶段:PrefillDecode。这两个阶段具有截然不同的计算特性。

2.2.1 Prefill 阶段#

特点

  • 处理所有输入 tokens(可能数万甚至数十万)
  • 计算密集型(Compute-bound)
  • 高度并行化(所有 token 可同时计算)
  • 输出:第一个生成的 token + 完整的 KVCache

计算复杂度

$$T_{prefill} \propto l \times (an^2d + bnd^2)$$

其中 $n$ 是输入长度。注意 $n^2$ 项来自注意力计算,这使得长上下文的 Prefill 时间急剧增长。

GPU 利用率:由于高并行度,Prefill 阶段可以充分利用 GPU 的计算能力,MFU(Model FLOPs Utilization)较高。

2.2.2 Decode 阶段#

特点

  • 每次只生成一个 token
  • 内存密集型(Memory-bound)
  • 无法并行化(自回归依赖)
  • 需要访问完整的 KVCache

计算复杂度

$$T_{decode} \propto l \times d^2 + l \times nd$$

第一项是 MLP 计算,第二项是与 KVCache 的注意力计算。由于每次只处理一个 token,GPU 计算单元严重闲置。

GPU 利用率:单个请求的 Decode 阶段 MFU 很低。优化方法是使用 Continuous Batching,将多个请求的 Decode 合并执行。

2.2.3 两阶段对比#

特性 Prefill Decode
处理 token 数 多(输入长度) 单个
计算类型 计算密集型 内存密集型
并行度
GPU 利用率 低(需 batching)
主要瓶颈 计算吞吐量 内存带宽
优化目标 TTFT TBT
graph TB
    subgraph "Prefill 阶段特性"
        P1[大量输入 tokens]
        P2[并行计算]
        P3[计算密集]
        P4[高 GPU 利用率]
        P1 --> P2 --> P3 --> P4
    end

    subgraph "Decode 阶段特性"
        D1[单个 token]
        D2[顺序生成]
        D3[内存密集]
        D4[低 GPU 利用率]
        D1 --> D2 --> D3 --> D4
    end

    subgraph "优化策略"
        O1[Prefill: 缓存复用<br/>减少计算量]
        O2[Decode: Continuous Batching<br/>提高并行度]
    end

    P4 --> O1
    D4 --> O2

2.2.4 为什么需要 P/D 分离#

传统架构将 Prefill 和 Decode 耦合在同一节点,导致:

  1. 相互干扰:长 Prefill 请求阻塞 Decode 请求,造成 TBT 超时
  2. 资源配置困难:无法针对不同阶段优化资源配置
  3. 弹性伸缩受限:无法独立扩缩容 Prefill 或 Decode 能力

P/D 分离的优势

graph LR
    subgraph "耦合架构"
        C1[请求] --> C2[节点]
        C2 --> C3[Prefill]
        C2 --> C4[Decode]
        C3 -.相互干扰.-> C4
    end

    subgraph "分离架构"
        S1[请求] --> S2[调度器]
        S2 --> S3[Prefill 池]
        S2 --> S4[Decode 池]
        S3 --> S5[KVCache 传输]
        S5 --> S4
    end
  • Prefill 节点可以专注于最大化计算吞吐
  • Decode 节点可以专注于保证 TBT SLO
  • 可以根据负载特征独立调整 P/D 比例

2.3 RDMA 网络基础#

Mooncake 的高性能 KVCache 传输依赖于 RDMA(Remote Direct Memory Access)网络。本节介绍 RDMA 的基本概念。

2.3.1 什么是 RDMA#

RDMA 允许一台计算机直接访问另一台计算机的内存,绑过操作系统内核和 CPU,实现极低延迟和高带宽的数据传输。

graph TB
    subgraph "传统 TCP/IP"
        T1[应用程序] --> T2[Socket API]
        T2 --> T3[TCP/IP 协议栈]
        T3 --> T4[内核缓冲区]
        T4 --> T5[网卡驱动]
        T5 --> T6[网卡]
    end

    subgraph "RDMA"
        R1[应用程序] --> R2[RDMA Verbs]
        R2 --> R3[网卡]
        R3 -.直接访问.-> R4[远程内存]
    end

RDMA 的优势

特性 TCP/IP RDMA
CPU 参与 每个数据包 仅初始化
内存拷贝 多次 零拷贝
延迟 ~100μs ~1-2μs
带宽利用率 ~50-70% ~95%+

2.3.2 RDMA 的主要实现#

技术 网络类型 特点
InfiniBand 专用网络 最高性能,需专用硬件
RoCE v2 以太网 基于以太网,需支持 ECN
iWARP 以太网 兼容性好,性能较低
eRDMA 云网络 阿里云等云厂商提供

Mooncake 主要使用 InfiniBandRoCE,因为它们在 GPU 集群中最为常见。

2.3.3 RDMA 核心概念#

1. 保护域(Protection Domain, PD)

PD 是 RDMA 资源的安全边界,同一 PD 内的资源可以相互访问。

2. 内存区域(Memory Region, MR)

应用程序必须先向 RDMA 子系统注册内存区域,获得 lkey(本地访问)和 rkey(远程访问)。

1
2
3
4
5
6
7
// 内存注册示例
struct ibv_mr *mr = ibv_reg_mr(pd, addr, length,
    IBV_ACCESS_LOCAL_WRITE |
    IBV_ACCESS_REMOTE_WRITE |
    IBV_ACCESS_REMOTE_READ);
uint32_t lkey = mr->lkey;  // 本地访问密钥
uint32_t rkey = mr->rkey;  // 远程访问密钥

3. 队列对(Queue Pair, QP)

QP 是 RDMA 通信的端点,包含:

  • Send Queue (SQ):发送工作请求
  • Receive Queue (RQ):接收工作请求
graph LR
    subgraph "本地节点"
        SQ1[Send Queue]
        RQ1[Receive Queue]
        CQ1[Completion Queue]
    end

    subgraph "远程节点"
        SQ2[Send Queue]
        RQ2[Receive Queue]
        CQ2[Completion Queue]
    end

    SQ1 -->|RDMA Write| RQ2
    SQ2 -->|RDMA Read| RQ1

4. 完成队列(Completion Queue, CQ)

当 RDMA 操作完成时,结果被放入 CQ,应用程序轮询 CQ 获取完成通知。

5. RDMA 操作类型

操作 描述 是否需要远程 CPU
RDMA Write 写入远程内存
RDMA Read 读取远程内存
Send/Recv 消息传递 是(需预先 post recv)

Mooncake 主要使用 RDMA WriteRDMA Read 进行 KVCache 传输,因为它们完全绑过远程 CPU。

2.3.4 GPU Direct RDMA#

GPU Direct RDMA (GDR) 允许网卡直接访问 GPU 显存,无需经过 CPU 内存中转:

graph LR
    subgraph "无 GDR"
        G1[GPU A] -->|PCIe| C1[CPU 内存]
        C1 -->|RDMA| C2[CPU 内存]
        C2 -->|PCIe| G2[GPU B]
    end

    subgraph "有 GDR"
        G3[GPU A] -->|直接 RDMA| G4[GPU B]
    end

GDR 的优势

  • 减少数据拷贝
  • 降低延迟
  • 释放 CPU 内存带宽

限制

  • 需要 GPU 和网卡在同一 PCIe 域
  • 需要特定驱动支持

Mooncake 的 Transfer Engine 会自动检测 GDR 支持情况,并选择最优传输路径。

2.3.5 多网卡聚合#

现代 GPU 服务器通常配备多块 RDMA 网卡。例如,NVIDIA HGX 系统中:

  • 8×A800 配备 8×100Gbps 或 4×200Gbps 网卡
  • 8×H800 配备 8×400Gbps 网卡

聚合带宽可达 800Gbps - 3200Gbps(100-400 GB/s),与 DDR5 内存带宽相当。

Mooncake 的 Transfer Engine 实现了拓扑感知的多网卡聚合,能够:

  1. 自动发现所有可用网卡
  2. 根据数据位置选择最优网卡
  3. 将大传输分割到多网卡并行执行

2.4 相关系统与技术#

2.4.1 vLLM 与 PagedAttention#

vLLM 是当前最流行的开源 LLM 推理系统,其核心创新是 PagedAttention

  • 将 KVCache 分成固定大小的页(block)
  • 使用类似操作系统的页表管理
  • 支持 Prefix Caching(本地)

Mooncake 借鉴了 PagedAttention 的分页思想,但将其扩展到分布式场景。

2.4.2 Continuous Batching#

Continuous Batching(持续批处理)是 Decode 阶段的关键优化:

  • 不等待所有请求完成再开始新批次
  • 每个 iteration 动态添加新请求、移除已完成请求
  • 显著提高 GPU 利用率
graph TB
    subgraph "静态 Batching"
        S1[Batch 1: A, B, C] --> S2[等待全部完成]
        S2 --> S3[Batch 2: D, E, F]
    end

    subgraph "Continuous Batching"
        C1[Iter 1: A, B, C]
        C2[Iter 2: A, B, D]
        C3[Iter 3: A, D, E]
        C4[Iter 4: D, E, F]
        C1 --> C2
        C2 --> C3
        C3 --> C4
        Note1[C 完成, D 加入]
        Note2[B 完成, E 加入]
        Note3[A 完成, F 加入]
    end

2.4.3 Chunked Prefill#

Chunked Prefill 将长 Prefill 请求分成多个小块执行:

  • 减少单次 Prefill 对 Decode 的干扰
  • 缺点:无法完全消除干扰,且降低 Prefill MFU

Mooncake 选择 P/D 分离而非 Chunked Prefill,因为前者能更彻底地解决干扰问题。

2.4.4 Sequence Parallelism#

Sequence Parallelism (SP) 将长序列分布到多个节点并行处理:

  • 支持超长上下文
  • 缺点:需要频繁的跨节点通信(每层 2 次 AllReduce)

Mooncake 提出的 Chunked Pipeline Parallelism (CPP) 通过流水线方式减少通信开销。

2.5 本章小结#

本章介绍了理解 Mooncake 所需的技术背景:

  1. KVCache 是 LLM 推理的核心数据结构,大小与上下文长度成正比
  2. Prefix Caching 可以显著减少重复计算,但需要大容量全局缓存
  3. Prefill/Decode 分离 可以消除两阶段的相互干扰
  4. RDMA 提供了高带宽、低延迟的跨节点数据传输能力
  5. GPU Direct RDMA 进一步减少了数据拷贝开销

有了这些背景知识,我们将在下一章深入 Mooncake 的系统架构设计。


第二部分:系统架构深度解析#

第 3 章:整体架构设计#

3.1 架构总览#

Mooncake 采用了一个精心设计的分层架构,将 LLM 推理系统分解为多个独立但协作的组件。这种设计使得系统能够在保持高性能的同时,实现灵活的资源管理和调度。

3.1.1 核心组件#

graph TB
    subgraph "控制平面 (Control Plane)"
        Conductor[Conductor<br/>全局调度器]
        MetaStore[Metadata Store<br/>元数据存储]
    end

    subgraph "数据平面 (Data Plane)"
        subgraph "Prefill 节点池"
            P1[Prefill Node 1]
            P2[Prefill Node 2]
            Pn[Prefill Node N]
        end

        subgraph "Decode 节点池"
            D1[Decode Node 1]
            D2[Decode Node 2]
            Dn[Decode Node N]
        end

        subgraph "Mooncake Store (分布式 KVCache)"
            MS1[Store Node 1<br/>CPU/GPU/SSD]
            MS2[Store Node 2<br/>CPU/GPU/SSD]
            MSn[Store Node N<br/>CPU/GPU/SSD]
        end
    end

    subgraph "传输层 (Transfer Layer)"
        TE[Transfer Engine<br/>RDMA/TCP/NVMe-oF]
    end

    Client[客户端请求] --> Conductor
    Conductor --> P1 & P2 & Pn
    Conductor --> D1 & D2 & Dn
    P1 & P2 & Pn <--> TE
    D1 & D2 & Dn <--> TE
    TE <--> MS1 & MS2 & MSn
    Conductor <--> MetaStore

Mooncake 系统由以下核心组件构成:

组件 职责 关键特性
Conductor 全局调度器 Cache-aware 调度、负载均衡、SLO 保证
Prefill Nodes 处理输入 tokens 计算密集型、生成 KVCache
Decode Nodes 生成输出 tokens 内存密集型、消费 KVCache
Mooncake Store 分布式 KVCache 存储 多级存储、跨节点共享
Transfer Engine 高性能数据传输 RDMA、拓扑感知、多协议支持
Metadata Store 元数据管理 基于 etcd、高可用

3.1.2 代码模块对应关系#

Mooncake 的代码结构与上述架构紧密对应:

Mooncake/
├── mooncake-transfer-engine/    # Transfer Engine 核心实现
│   ├── include/
│   │   ├── transfer_engine.h    # 主入口 API
│   │   ├── transport/           # 各种传输协议实现
│   │   │   ├── rdma_transport/  # RDMA 传输
│   │   │   ├── tcp_transport/   # TCP 传输
│   │   │   └── nvmeof_transport/# NVMe-oF 传输
│   │   ├── topology.h           # 拓扑发现
│   │   └── transfer_metadata.h  # 元数据管理
│   └── src/
│       ├── transfer_engine.cpp  # Transfer Engine 实现
│       └── transport/           # 传输层实现
│
├── mooncake-store/              # Mooncake Store 实现
│   ├── include/
│   │   ├── storage_backend.h    # 存储后端抽象
│   │   ├── eviction_strategy.h  # 缓存驱逐策略
│   │   ├── master_service.h     # Master 服务
│   │   └── ha_helper.h          # 高可用支持
│   └── src/
│       ├── ha_helper.cpp        # Leader 选举实现
│       └── master_service.cpp   # Master 服务实现
│
├── mooncake-integration/        # 与推理框架的集成
│   ├── vllm/                    # vLLM 集成
│   └── sglang/                  # SGLang 集成
│
└── mooncake-wheel/              # Python 包装器和 CLI 工具

3.2 P/D 分离架构详解#

3.2.1 分离架构的核心思想#

P/D(Prefill/Decode)分离是 Mooncake 架构的核心设计决策。与传统的将两个阶段耦合在同一节点的方案不同,Mooncake 将它们部署在独立的节点池中:

graph TB
    subgraph "传统耦合架构"
        direction LR
        TR[请求] --> TN1[节点 1]
        TR --> TN2[节点 2]
        TN1 --> TP1[Prefill] --> TD1[Decode]
        TN2 --> TP2[Prefill] --> TD2[Decode]
        style TP1 fill:#ff9999
        style TD1 fill:#99ccff
        style TP2 fill:#ff9999
        style TD2 fill:#99ccff
    end

    subgraph "Mooncake P/D 分离架构"
        direction LR
        MR[请求] --> MC[Conductor]
        MC --> MP1[Prefill 1]
        MC --> MP2[Prefill 2]
        MP1 --> MKV[KVCache<br/>传输]
        MP2 --> MKV
        MKV --> MD1[Decode 1]
        MKV --> MD2[Decode 2]
        style MP1 fill:#ff9999
        style MP2 fill:#ff9999
        style MD1 fill:#99ccff
        style MD2 fill:#99ccff
    end

3.2.2 分离带来的优势#

1. 消除阶段干扰

在耦合架构中,长 Prefill 请求会阻塞同节点上的 Decode 请求,导致 TBT 超时。分离后:

  • Prefill 节点可以专注于处理计算密集型任务
  • Decode 节点不受 Prefill 干扰,TBT 稳定可控

2. 独立资源配置

阶段 资源需求 优化方向
Prefill 高计算能力、中等内存 最大化 MFU
Decode 高内存带宽、大 batch size 最大化并发请求数

分离后可以针对不同阶段优化配置:

  • Prefill 节点可以使用计算更强的 GPU
  • Decode 节点可以配置更大的显存和 batch size

3. 弹性伸缩

可以根据工作负载特征独立调整 P/D 节点比例:

  • 长上下文场景:增加 Prefill 节点
  • 高并发场景:增加 Decode 节点

3.2.3 分离带来的挑战:KVCache 传输#

P/D 分离的代价是需要在 Prefill 完成后将 KVCache 传输到 Decode 节点。这引入了额外的网络开销:

传输量估算(以 LLaMA3-70B 为例):

上下文长度 KVCache 大小 100GB/s 网络传输时间
4K tokens 1.28 GB ~13 ms
32K tokens 10.24 GB ~100 ms
128K tokens 40.96 GB ~400 ms

关键洞察:Mooncake 通过以下机制将传输开销最小化:

  1. Prefix Caching:如果目标 Decode 节点已有部分 KVCache 缓存,只需传输差异部分
  2. 并行传输:利用多网卡聚合带宽
  3. 流水线:KVCache 可以边生成边传输

3.3 Mooncake Store 设计概览#

Mooncake Store 是系统的核心组件,实现了分布式 KVCache 存储池。

3.3.1 存储层次结构#

graph TB
    subgraph "存储层次 (从快到慢)"
        L1[GPU VRAM<br/>~2 TB/s 带宽]
        L2[CPU DRAM<br/>~400 GB/s 带宽]
        L3[NVMe SSD<br/>~10-50 GB/s 带宽]
    end

    L1 --> L2 --> L3

    subgraph "容量 vs 速度"
        C1[少量高热点数据]
        C2[中等热度数据]
        C3[大量冷数据]
    end

    L1 -.-> C1
    L2 -.-> C2
    L3 -.-> C3

Mooncake Store 利用集群中所有节点的多级存储资源:

存储层 介质 典型带宽 典型容量 (8 节点)
L1 GPU VRAM 2-3 TB/s 640 GB - 1.28 TB
L2 CPU DRAM 400 GB/s 8-16 TB
L3 NVMe SSD 10-50 GB/s 数十 TB

3.3.2 KVCache 对象模型#

Mooncake Store 将 KVCache 组织为树形结构:

graph TB
    Root[根节点<br/>空前缀] --> A[Token 序列 A]
    Root --> B[Token 序列 B]
    A --> A1[A 的扩展 1]
    A --> A2[A 的扩展 2]
    A1 --> A1a[A1 的扩展]
    B --> B1[B 的扩展 1]

    style Root fill:#f0f0f0
    style A fill:#ffe0e0
    style B fill:#e0ffe0
    style A1 fill:#ffe0e0
    style A2 fill:#ffe0e0
    style A1a fill:#ffe0e0
    style B1 fill:#e0ffe0

Radix Tree 结构特点

  • 公共前缀共享存储空间
  • 支持快速前缀匹配
  • 天然支持 Prefix Caching

3.3.3 分片与复制策略#

KVCache 被分成固定大小的 Chunk(通常 4KB-64KB),每个 Chunk 可以:

  • 存储在不同节点上
  • 拥有多个副本(用于热点数据)
  • 在不同存储层之间迁移
graph LR
    subgraph "KVCache 对象"
        K[KVCache<br/>128K tokens]
        K --> C1[Chunk 1]
        K --> C2[Chunk 2]
        K --> C3[Chunk N]
    end

    subgraph "分布式存储"
        N1[节点 1]
        N2[节点 2]
        N3[节点 3]
    end

    C1 --> N1
    C1 -.副本.-> N2
    C2 --> N2
    C3 --> N3
    C3 -.副本.-> N1

3.4 Transfer Engine 设计概览#

Transfer Engine 是 Mooncake 的高性能数据传输层,负责在节点间高效传输 KVCache 数据。

3.4.1 核心设计目标#

  1. 高带宽:充分利用多 RDMA 网卡的聚合带宽
  2. 低延迟:最小化传输开销
  3. 拓扑感知:自动选择最优传输路径
  4. 协议抽象:统一 API 支持多种传输协议

3.4.2 架构分层#

graph TB
    subgraph "应用层"
        App[LLM 推理引擎<br/>vLLM / SGLang]
    end

    subgraph "Transfer Engine API"
        TE[TransferEngine]
        TE --> AM[allocateBatchID]
        TE --> ST[submitTransfer]
        TE --> GS[getTransferStatus]
    end

    subgraph "Transport Layer"
        MT[MultiTransport]
        MT --> RDMA[RdmaTransport]
        MT --> TCP[TcpTransport]
        MT --> NVME[NvmeofTransport]
    end

    subgraph "Hardware Abstraction"
        RDMA --> RC[RdmaContext]
        RDMA --> RE[RdmaEndpoint]
        RC --> NIC1[NIC 1]
        RC --> NIC2[NIC 2]
    end

    App --> TE
    TE --> MT

3.4.3 关键代码结构#

transfer_engine.h 中可以看到 Transfer Engine 的核心 API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TransferEngine {
public:
    // 初始化引擎
    int init(const std::string& metadata_conn_string,
             const std::string& local_server_name,
             const std::string& ip_or_host_name,
             uint64_t rpc_port);

    // 安装传输协议
    Transport* installTransport(const std::string& proto, void** args);

    // 注册本地内存区域 (用于 RDMA)
    int registerLocalMemory(void* addr, size_t length,
                           const std::string& location,
                           bool remote_accessible,
                           bool update_metadata = true);

    // 分配 Batch ID (用于批量传输)
    BatchID allocateBatchID(size_t batch_size);

    // 提交传输请求
    Status submitTransfer(BatchID batch_id,
                         const std::vector<TransferRequest>& entries);

    // 获取传输状态
    Status getTransferStatus(BatchID batch_id, size_t task_id,
                            TransferStatus& status);

    // 打开远程 Segment
    SegmentHandle openSegment(const std::string& segment_name);
};

核心数据结构(来自 transport.h):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
struct TransferRequest {
    enum OpCode { READ, WRITE };

    OpCode opcode;           // 读或写操作
    void* source;            // 本地地址
    SegmentID target_id;     // 目标 Segment
    uint64_t target_offset;  // 目标偏移
    size_t length;           // 传输长度
};

enum TransferStatusEnum {
    WAITING,     // 等待执行
    PENDING,     // 执行中
    COMPLETED,   // 完成
    FAILED,      // 失败
    TIMEOUT      // 超时
};

3.5 Conductor 调度器概览#

Conductor 是 Mooncake 的全局调度器,负责将请求分配到合适的 Prefill 和 Decode 节点。

3.5.1 调度目标#

Conductor 需要同时优化多个目标:

graph LR
    subgraph "调度目标"
        G1[最小化 TTFT]
        G2[满足 TBT SLO]
        G3[最大化 Cache 命中]
        G4[负载均衡]
    end

    subgraph "约束条件"
        C1[节点容量]
        C2[网络带宽]
        C3[SLO 阈值]
    end

    G1 & G2 & G3 & G4 --> D[调度决策]
    C1 & C2 & C3 --> D

3.5.2 Cache-aware 调度#

Conductor 的核心创新是 Cache-aware 调度——将请求调度到能够复用最多 KVCache 的节点:

sequenceDiagram
    participant R as 新请求
    participant C as Conductor
    participant MS as Mooncake Store
    participant P as Prefill Nodes

    R->>C: 请求 (prefix tokens)
    C->>MS: 查询 prefix 位置
    MS-->>C: 返回各节点缓存情况
    C->>C: 计算最优节点<br/>(最大缓存复用 + 负载均衡)
    C->>P: 调度到最优 Prefill 节点

调度算法核心思想

给定请求 $r$ 和候选节点集合 $N$,调度得分为:

$$score(r, n) = \alpha \cdot \frac{cached_prefix_length(r, n)}{total_prefix_length(r)} + \beta \cdot (1 - load(n))$$

其中:

  • $cached_prefix_length(r, n)$:节点 $n$ 上已缓存的请求 $r$ 的前缀长度
  • $total_prefix_length(r)$:请求 $r$ 的总前缀长度
  • $load(n)$:节点 $n$ 的当前负载
  • $\alpha, \beta$:权重参数

3.6 元数据管理#

Mooncake 使用 etcd 作为分布式元数据存储,管理以下信息:

3.6.1 元数据类型#

元数据类型 内容 更新频率
Segment 信息 节点地址、端口、协议 节点启动/退出时
Buffer 描述 内存地址、大小、rkey 内存注册时
拓扑信息 NIC 配置、存储层次 节点启动时
Master 视图 当前 Master 地址 Leader 选举时

3.6.2 高可用设计#

ha_helper.cpp 可以看到 Master 选举机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void MasterViewHelper::ElectLeader(const std::string& master_address,
                                   ViewVersionId& version,
                                   EtcdLeaseId& lease_id) {
    while (true) {
        // 1. 检查是否已有 Leader
        auto ret = EtcdHelper::Get(master_view_key_, ...);

        if (ret != ErrorCode::ETCD_KEY_NOT_EXIST) {
            // 已有 Leader,等待其失效
            EtcdHelper::WatchUntilDeleted(master_view_key_, ...);
            continue;
        }

        // 2. 尝试成为 Leader
        // 获取租约
        EtcdHelper::GrantLease(ETCD_MASTER_VIEW_LEASE_TTL, lease_id);

        // 原子创建 key (带租约)
        ret = EtcdHelper::CreateWithLease(master_view_key_, master_address,
                                          lease_id, version);

        if (ret == ErrorCode::OK) {
            // 成功成为 Leader
            return;
        }
        // 失败则重试
    }
}

选举流程

sequenceDiagram
    participant N1 as 节点 1
    participant N2 as 节点 2
    participant E as etcd

    N1->>E: 检查 master_view key
    E-->>N1: key 不存在
    N2->>E: 检查 master_view key
    E-->>N2: key 不存在

    N1->>E: 获取 lease
    E-->>N1: lease_id = 123
    N2->>E: 获取 lease
    E-->>N2: lease_id = 456

    N1->>E: CreateWithLease(master_view, "N1", 123)
    E-->>N1: 成功!
    N2->>E: CreateWithLease(master_view, "N2", 456)
    E-->>N2: 失败 (key 已存在)

    Note over N1: 成为 Leader
    Note over N2: 等待 Leader 失效

    loop 保持 Leader
        N1->>E: KeepAlive(lease_id=123)
    end

3.7 本章小结#

本章介绍了 Mooncake 的整体架构设计:

  1. 分层架构:控制平面(Conductor)、数据平面(P/D 节点)、存储层(Mooncake Store)、传输层(Transfer Engine)
  2. P/D 分离:消除阶段干扰、支持独立资源配置和弹性伸缩
  3. Mooncake Store:多级存储层次、树形 KVCache 组织、分片与复制
  4. Transfer Engine:协议抽象、拓扑感知、多网卡聚合
  5. Conductor:Cache-aware 调度、负载均衡、SLO 保证
  6. 元数据管理:基于 etcd、Leader 选举、高可用

第 4 章:请求处理流程详解#

本章将深入分析一个请求从进入系统到返回结果的完整流程,涵盖各个组件之间的交互细节。

4.1 请求生命周期总览#

一个 LLM 推理请求在 Mooncake 系统中经历以下主要阶段:

sequenceDiagram
    participant Client as 客户端
    participant Conductor as Conductor
    participant Prefill as Prefill Node
    participant Store as Mooncake Store
    participant Decode as Decode Node

    rect rgb(255, 240, 240)
        Note over Client,Conductor: 阶段 1: 请求调度
        Client->>Conductor: 发送请求 (prompt)
        Conductor->>Conductor: Cache-aware 调度决策
        Conductor->>Prefill: 分配 Prefill 任务
    end

    rect rgb(240, 255, 240)
        Note over Prefill,Store: 阶段 2: Prefill 处理
        Prefill->>Store: 查询 prefix cache
        Store-->>Prefill: 返回缓存的 KVCache (如有)
        Prefill->>Prefill: 计算 Prefill
        Prefill->>Store: 存储新生成的 KVCache
        Prefill->>Conductor: Prefill 完成
    end

    rect rgb(240, 240, 255)
        Note over Conductor,Decode: 阶段 3: Decode 调度与执行
        Conductor->>Decode: 分配 Decode 任务
        Decode->>Store: 加载 KVCache
        loop 自回归生成
            Decode->>Decode: 生成一个 token
            Decode-->>Client: 流式返回 token
            Decode->>Store: 更新 KVCache (可选)
        end
    end

    rect rgb(255, 255, 240)
        Note over Decode,Client: 阶段 4: 完成与清理
        Decode->>Conductor: Decode 完成
        Decode-->>Client: 结束标记
    end

4.2 阶段一:请求调度#

4.2.1 请求接收与解析#

当客户端发送请求时,Conductor 首先解析请求内容:

1
2
3
4
5
6
7
# 请求结构示例
request = {
    "prompt": "Please analyze this document: [doc content]...",
    "max_tokens": 2048,
    "temperature": 0.7,
    "stream": True
}

Conductor 将 prompt 进行 tokenize,并计算其 prefix hash

graph LR
    P[Prompt 文本] --> T[Tokenizer]
    T --> Tokens[Token IDs]
    Tokens --> H[Hash 函数]
    H --> PH[Prefix Hash]

4.2.2 Cache 查询#

Conductor 查询 Mooncake Store,获取各节点上已缓存的 prefix 情况:

graph TB
    subgraph MooncakeStoreMeta[Mooncake Store 元数据]
        M[Prefix Hash 索引]
        M --> E1["Hash: abc123<br/>节点: [N1, N3]<br/>长度: 4096"]
        M --> E2["Hash: def456<br/>节点: [N2]<br/>长度: 8192"]
        M --> E3["Hash: ghi789<br/>节点: [N1, N2, N4]<br/>长度: 2048"]
    end

查询结果告诉 Conductor:

  • 哪些节点缓存了请求的部分或全部前缀
  • 各节点的缓存命中长度

4.2.3 调度决策#

Conductor 综合考虑以下因素做出调度决策:

1. Cache 命中优化

选择缓存命中长度最大的节点,可以最大程度减少 Prefill 计算量:

节省的计算量 ∝ cached_prefix_length²

2. 负载均衡

避免将请求都调度到同一节点:

1
2
3
4
5
6
7
8
9
# 伪代码:调度得分计算
def compute_score(node, request):
    cache_hit_length = get_cache_hit_length(node, request.prefix_hash)
    cache_score = cache_hit_length / request.total_length

    load = get_node_load(node)
    load_score = 1.0 - load

    return alpha * cache_score + beta * load_score

3. 网络拓扑

优先选择与数据源网络距离近的节点:

graph TB
    subgraph "机架 1"
        N1[节点 1]
        N2[节点 2]
    end
    subgraph "机架 2"
        N3[节点 3]
        N4[节点 4]
    end

    N1 <-->|低延迟| N2
    N3 <-->|低延迟| N4
    N1 <-->|高延迟| N3

如果 KVCache 主要在机架 1 的节点上,优先选择同机架的 Prefill 节点。

4.3 阶段二:Prefill 处理#

4.3.1 KVCache 加载#

Prefill 节点收到任务后,首先从 Mooncake Store 加载已缓存的 KVCache:

sequenceDiagram
    participant P as Prefill Node
    participant TE as Transfer Engine
    participant S1 as Store Node 1
    participant S2 as Store Node 2

    P->>TE: 请求加载 KVCache chunks [1, 2, 3, ...]

    par 并行传输
        TE->>S1: RDMA Read chunk 1, 3
        TE->>S2: RDMA Read chunk 2, 4
    end

    S1-->>TE: chunk 1, 3 数据
    S2-->>TE: chunk 2, 4 数据

    TE-->>P: KVCache 加载完成

Transfer Engine 的并行传输机制(来自 rdma_transport.cpp):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status RdmaTransport::submitTransfer(
    BatchID batch_id, const std::vector<TransferRequest>& entries) {

    // 将大传输分割成 slices
    const size_t kBlockSize = globalConfig().slice_size;  // 通常 64KB

    for (auto& request : entries) {
        for (uint64_t offset = 0; offset < request.length;
             offset += kBlockSize) {

            Slice* slice = getSliceCache().allocate();
            slice->source_addr = (char*)request.source + offset;
            slice->length = std::min(kBlockSize, request.length - offset);
            slice->opcode = request.opcode;
            slice->rdma.dest_addr = request.target_offset + offset;

            // 选择最优网卡
            int device_id = selectDevice(...);
            slices_to_post[context_list_[device_id]].push_back(slice);
        }
    }

    // 提交到各网卡并行执行
    for (auto& entry : slices_to_post) {
        entry.first->submitPostSend(entry.second);
    }
}

4.3.2 增量 Prefill#

只对未缓存的 tokens 执行 Prefill 计算:

graph LR
    subgraph "输入 Tokens"
        T1[Token 1]
        T2[Token 2]
        T3[...]
        T4[Token 4096]
        T5[Token 4097]
        T6[...]
        T7[Token 8000]
    end

    subgraph "处理方式"
        C[从 Cache 加载<br/>Token 1-4096]
        P[Prefill 计算<br/>Token 4097-8000]
    end

    T1 & T2 & T3 & T4 --> C
    T5 & T6 & T7 --> P

    style T1 fill:#90EE90
    style T2 fill:#90EE90
    style T3 fill:#90EE90
    style T4 fill:#90EE90
    style T5 fill:#FFB6C1
    style T6 fill:#FFB6C1
    style T7 fill:#FFB6C1

增量 Prefill 的计算节省

假设总长度 8000 tokens,缓存命中 4096 tokens:

  • 传统方式:需要计算 8000² ≈ 6400 万次注意力操作
  • 增量方式:只需计算 (8000² - 4096²) ≈ 4720 万次注意力操作
  • 节省:26% 计算量

4.3.3 KVCache 存储#

Prefill 完成后,将新生成的 KVCache 存储到 Mooncake Store:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 伪代码:KVCache 存储流程
void store_kvcache(KVCache& cache, PrefixHash& hash) {
    // 1. 分割成 chunks
    auto chunks = cache.split_into_chunks(CHUNK_SIZE);

    // 2. 计算存储位置
    std::vector<NodeId> target_nodes = select_storage_nodes(chunks);

    // 3. 并行写入
    BatchID batch = engine.allocateBatchID(chunks.size());
    for (size_t i = 0; i < chunks.size(); i++) {
        TransferRequest req;
        req.opcode = WRITE;
        req.source = chunks[i].data();
        req.target_id = target_nodes[i];
        req.target_offset = get_chunk_offset(hash, i);
        req.length = chunks[i].size();
        requests.push_back(req);
    }
    engine.submitTransfer(batch, requests);

    // 4. 等待完成
    wait_for_completion(batch);

    // 5. 更新元数据
    update_prefix_metadata(hash, target_nodes);
}

4.4 阶段三:Decode 处理#

4.4.1 Decode 节点分配#

Prefill 完成后,Conductor 选择 Decode 节点。选择策略考虑:

  1. KVCache 位置:优先选择已有 KVCache 的节点(避免传输)
  2. 当前负载:选择有足够空闲 batch 槽位的节点
  3. TBT SLO:确保选择的节点能满足 TBT 要求
graph TB
    subgraph "Decode 节点选择"
        C[Conductor] --> Q{KVCache 在哪?}
        Q -->|已在 Decode 节点| D1[直接使用该节点]
        Q -->|在 Store 中| D2[选择空闲节点 + 传输]
        Q -->|在 Prefill 节点| D3[考虑是否迁移]
    end

4.4.2 KVCache 迁移(如需要)#

如果 KVCache 不在目标 Decode 节点上,需要进行迁移:

sequenceDiagram
    participant C as Conductor
    participant P as Prefill Node
    participant D as Decode Node

    C->>P: 通知 Prefill 完成
    C->>D: 分配 Decode 任务

    alt KVCache 在 Prefill 节点
        P->>D: 直接传输 KVCache
    else KVCache 在 Store
        D->>Store: 加载 KVCache
    end

    D->>D: 开始 Decode

传输与计算重叠

Mooncake 支持边传输边计算,进一步减少延迟:

gantt
    title KVCache 传输与 Decode 重叠
    dateFormat X
    axisFormat %s

    section 传输
    Chunk 1-4    :t1, 0, 10
    Chunk 5-8    :t2, 10, 20

    section Decode
    使用 Chunk 1-4 :d1, 10, 25
    使用 Chunk 5-8 :d2, 20, 35

4.4.3 自回归生成#

Decode 节点进入自回归生成循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 伪代码:Decode 循环
def decode_loop(kvcache, prompt_tokens, max_new_tokens):
    generated_tokens = []
    current_kvcache = kvcache

    for i in range(max_new_tokens):
        # 1. 获取最后一个 token
        last_token = generated_tokens[-1] if generated_tokens else prompt_tokens[-1]

        # 2. 前向计算 (使用 KVCache)
        logits = model.forward(last_token, current_kvcache)

        # 3. 采样下一个 token
        next_token = sample(logits, temperature, top_p)

        # 4. 更新 KVCache
        current_kvcache.append(next_token_kv)

        # 5. 流式返回
        yield next_token

        # 6. 检查停止条件
        if next_token == EOS_TOKEN:
            break

        generated_tokens.append(next_token)

Continuous Batching

Decode 节点通常同时处理多个请求,使用 Continuous Batching 最大化 GPU 利用率:

graph TB
    subgraph "Iteration 1"
        B1[Batch: A, B, C, D]
    end

    subgraph "Iteration 2"
        B2[Batch: A, B, D, E]
        Note1[C 完成, E 加入]
    end

    subgraph "Iteration 3"
        B3[Batch: A, D, E, F]
        Note2[B 完成, F 加入]
    end

    B1 --> B2 --> B3

4.4.4 KVCache 增量存储#

Decode 过程中,新生成的 KVCache 可以选择性地存储到 Mooncake Store:

  • 热数据:高频访问的前缀,主动复制到多节点
  • 冷数据:低频访问的数据,可以延迟写入或丢弃

4.5 阶段四:完成与资源回收#

4.5.1 请求完成处理#

当 Decode 完成(生成 EOS 或达到 max_tokens):

sequenceDiagram
    participant D as Decode Node
    participant C as Conductor
    participant S as Mooncake Store
    participant Client as 客户端

    D->>Client: 发送结束标记
    D->>C: 通知请求完成
    C->>C: 更新统计信息

    alt 保留 KVCache
        D->>S: 提交 KVCache 存储
        S->>S: 更新驱逐策略 (LRU/SIEVE)
    else 丢弃 KVCache
        D->>D: 释放本地 KVCache 内存
    end

4.5.2 缓存驱逐策略#

eviction_strategy.h 可以看到 Mooncake 支持的驱逐策略:

LRU (Least Recently Used)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class LRUEvictionStrategy : public EvictionStrategy {
public:
    ErrorCode AddKey(const std::string& key) override {
        // 新 key 添加到队列头部
        all_key_list_.push_front(key);
        all_key_idx_map_[key] = all_key_list_.begin();
        return ErrorCode::OK;
    }

    ErrorCode UpdateKey(const std::string& key) override {
        // 访问时移动到头部
        auto it = all_key_idx_map_.find(key);
        if (it != all_key_idx_map_.end()) {
            all_key_list_.erase(it->second);
            all_key_list_.push_front(key);
            all_key_idx_map_[key] = all_key_list_.begin();
        }
        return ErrorCode::OK;
    }

    std::string EvictKey() override {
        // 驱逐尾部 (最久未使用)
        if (all_key_list_.empty()) return "";
        std::string evicted_key = all_key_list_.back();
        all_key_list_.pop_back();
        all_key_idx_map_.erase(evicted_key);
        return evicted_key;
    }
};

FIFO (First In First Out)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class FIFOEvictionStrategy : public EvictionStrategy {
public:
    ErrorCode AddKey(const std::string& key) override {
        // 新 key 添加到头部
        all_key_list_.push_front(key);
        return ErrorCode::OK;
    }

    ErrorCode UpdateKey(const std::string& key) override {
        // FIFO 不关心访问顺序
        return ErrorCode::OK;
    }

    std::string EvictKey() override {
        // 驱逐尾部 (最早进入)
        if (all_key_list_.empty()) return "";
        std::string evicted_key = all_key_list_.back();
        all_key_list_.pop_back();
        return evicted_key;
    }
};

4.6 异常处理与容错#

4.6.1 节点故障处理#

graph TB
    subgraph "故障检测"
        H[心跳超时] --> D[节点被标记为 Down]
    end

    subgraph "Prefill 故障恢复"
        D --> P1{Prefill 是否完成?}
        P1 -->|否| P2[重新调度到其他节点]
        P1 -->|是| P3[继续 Decode 流程]
    end

    subgraph "Decode 故障恢复"
        D --> D1{KVCache 是否已存储?}
        D1 -->|是| D2[从 Store 恢复并重新调度]
        D1 -->|否| D3[从 Prefill 重新开始]
    end

4.6.2 传输失败重试#

Transfer Engine 内置了重试机制(来自 transport.h):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
struct Slice {
    // ...
    struct {
        uint32_t retry_cnt;      // 当前重试次数
        uint32_t max_retry_cnt;  // 最大重试次数
    } rdma;

    void markFailed() {
        if (rdma.retry_cnt < rdma.max_retry_cnt) {
            // 重新提交
            status = PENDING;
            rdma.retry_cnt++;
            resubmit();
        } else {
            // 永久失败
            status = FAILED;
            task->failed_slice_count++;
        }
    }
};

4.7 性能关键路径分析#

4.7.1 延迟分解#

一个典型请求的延迟分解:

pie title 请求延迟分解 (128K context)
    "调度决策" : 5
    "KVCache 加载" : 15
    "Prefill 计算" : 40
    "KVCache 存储" : 10
    "Decode 循环" : 30

4.7.2 关键优化点#

阶段 优化技术 效果
调度 预计算 prefix hash 减少决策延迟
加载 多网卡并行 2-8x 带宽提升
Prefill Prefix Caching 减少 20-80% 计算
传输 流水线 隐藏传输延迟
Decode Continuous Batching 提高 GPU 利用率

4.8 本章小结#

本章详细分析了 Mooncake 中请求的完整处理流程:

  1. 调度阶段:Cache-aware 调度、负载均衡、拓扑感知
  2. Prefill 阶段:KVCache 加载、增量计算、存储
  3. Decode 阶段:KVCache 迁移、自回归生成、Continuous Batching
  4. 完成阶段:资源回收、缓存驱逐
  5. 容错机制:故障检测、重试、恢复

这种精心设计的流程确保了 Mooncake 能够在满足 SLO 要求的同时,最大化资源利用率和缓存命中率。


第三部分:Mooncake Store 深度剖析#

本部分深入分析 Mooncake Store 的内部实现,包括存储架构设计、元数据管理、内存分配器、副本管理、驱逐策略以及高可用机制。

第五章:存储架构设计#

5.1 整体存储架构#

Mooncake Store 是一个专门为 KVCache 设计的分布式存储系统。与传统的分布式存储不同,它针对 LLM 推理场景进行了深度优化:

graph TB
    subgraph "Mooncake Store 架构"
        subgraph "Client Layer"
            C1[Prefill Client]
            C2[Decode Client]
            C3[Admin Client]
        end

        subgraph "Master Service"
            MS[MasterService]
            MM[MetadataManager]
            SM[SegmentManager]
            AM[AllocatorManager]
            EM[EvictionManager]
        end

        subgraph "Worker Layer"
            W1[Worker Node 1]
            W2[Worker Node 2]
            W3[Worker Node 3]
        end

        subgraph "Storage Backends"
            MEM[Memory Backend<br/>GPU/CPU Memory]
            DISK[Disk Backend<br/>NVMe SSD]
            LDISK[Local Disk<br/>客户端本地]
        end

        C1 & C2 & C3 --> MS
        MS --> MM
        MS --> SM
        MS --> AM
        MS --> EM
        SM --> W1 & W2 & W3
        W1 & W2 & W3 --> MEM & DISK
        C1 & C2 --> LDISK
    end

5.2 核心数据结构#

5.2.1 类型定义(types.h)#

mooncake-store/include/types.h 中可以看到系统的核心类型定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 关键类型别名
using ObjectKey = std::string;          // 对象键
using Version = uint64_t;               // 版本号
using SegmentId = int64_t;              // 段 ID
using UUID = std::pair<uint64_t, uint64_t>; // 唯一标识符

// 错误码枚举(部分)
enum class ErrorCode : int32_t {
    OK = 0,
    INTERNAL_ERROR = -1,
    NO_AVAILABLE_HANDLE = -200,   // 内存分配失败
    OBJECT_NOT_FOUND = -704,      // 对象未找到
    OBJECT_ALREADY_EXISTS = -705, // 对象已存在
    OBJECT_HAS_LEASE = -706,      // 对象持有租约
    // ...
};

// Segment 结构:表示一块连续的内存区域
struct Segment {
    UUID id{0, 0};
    std::string name{};       // 逻辑段名称
    uintptr_t base{0};        // 基地址
    size_t size{0};           // 大小
    std::string te_endpoint{}; // Transfer Engine 端点
};
5.2.2 副本管理(replica.h)#

副本(Replica)是 Mooncake Store 中数据存储的基本单元:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 副本类型
enum class ReplicaType {
    MEMORY,     // 内存副本(GPU/CPU)
    DISK,       // 磁盘副本(共享存储)
    LOCAL_DISK  // 本地磁盘副本
};

// 副本状态机
enum class ReplicaStatus {
    UNDEFINED = 0,   // 未初始化
    INITIALIZED,     // 空间已分配,等待写入
    PROCESSING,      // 写入进行中
    COMPLETE,        // 写入完成,可用
    REMOVED,         // 已移除
    FAILED,          // 失败状态
};

// 副本配置
struct ReplicateConfig {
    size_t replica_num{1};              // 副本数量
    bool with_soft_pin{false};          // 软钉住(soft pin)
    std::string preferred_segment{};     // 首选段
    bool prefer_alloc_in_same_node{false}; // 优先在同节点分配
};

副本状态转换图:

stateDiagram-v2
    [*] --> UNDEFINED
    UNDEFINED --> INITIALIZED: allocate()
    INITIALIZED --> PROCESSING: start_write()
    PROCESSING --> COMPLETE: mark_complete()
    PROCESSING --> FAILED: write_error()
    COMPLETE --> REMOVED: evict()/remove()
    FAILED --> REMOVED: cleanup()
    REMOVED --> [*]
5.2.3 Replica 类实现#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Replica {
public:
    // 内存副本构造函数
    Replica(std::unique_ptr<AllocatedBuffer> buffer, ReplicaStatus status)
        : data_(MemoryReplicaData{std::move(buffer)}), status_(status) {}

    // 磁盘副本构造函数
    Replica(std::string file_path, uint64_t object_size, ReplicaStatus status)
        : data_(DiskReplicaData{std::move(file_path), object_size}),
          status_(status) {
        // RAII 方式更新文件大小指标
        MasterMetricManager::instance().inc_allocated_file_size(object_size);
    }

    // 本地磁盘副本构造函数
    Replica(UUID client_id, uint64_t object_size,
            std::string transport_endpoint, ReplicaStatus status)
        : data_(LocalDiskReplicaData{client_id, object_size,
                                     std::move(transport_endpoint)}),
          status_(status) {}

    // 获取副本描述符(用于网络传输)
    [[nodiscard]] Descriptor get_descriptor() const;

    // 类型检查
    [[nodiscard]] bool is_memory_replica() const;
    [[nodiscard]] bool is_disk_replica() const;
    [[nodiscard]] bool is_local_disk_replica() const;

    // 标记完成
    void mark_complete() {
        if (status_ == ReplicaStatus::PROCESSING) {
            status_ = ReplicaStatus::COMPLETE;
        }
    }

private:
    // 使用 std::variant 实现类型安全的联合体
    std::variant<MemoryReplicaData, DiskReplicaData, LocalDiskReplicaData> data_;
    ReplicaStatus status_{ReplicaStatus::UNDEFINED};
};

5.3 Segment 管理#

5.3.1 Segment 层次结构#
classDiagram
    class SegmentManager {
        -segment_mutex_: shared_mutex
        -allocator_manager_: AllocatorManager
        -mounted_segments_: Map~UUID, MountedSegment~
        -client_segments_: Map~UUID, vector~UUID~~
        +getSegmentAccess(): ScopedSegmentAccess
        +getAllocatorAccess(): ScopedAllocatorAccess
    }

    class MountedSegment {
        +segment: Segment
        +status: SegmentStatus
        +buf_allocator: shared_ptr~BufferAllocatorBase~
    }

    class ScopedSegmentAccess {
        -segment_manager_: SegmentManager*
        -lock_: unique_lock
        +MountSegment()
        +UnmountSegment()
        +GetClientSegments()
    }

    class ScopedAllocatorAccess {
        -allocator_manager_: AllocatorManager&
        -lock_: shared_lock
        +getAllocatorManager()
    }

    SegmentManager --> MountedSegment
    SegmentManager --> ScopedSegmentAccess
    SegmentManager --> ScopedAllocatorAccess
5.3.2 Segment 状态#
1
2
3
4
5
enum class SegmentStatus {
    UNDEFINED = 0,  // 未初始化
    OK,             // 已挂载,可用于分配
    UNMOUNTING,     // 正在卸载
};
5.3.3 RAII 风格的访问控制#

Mooncake Store 使用 RAII(Resource Acquisition Is Initialization)模式来保证线程安全:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Segment 访问的 RAII 封装
class ScopedSegmentAccess {
public:
    explicit ScopedSegmentAccess(SegmentManager* segment_manager,
                                 std::shared_mutex& mutex)
        : segment_manager_(segment_manager), lock_(mutex) {}

    // 挂载 segment
    ErrorCode MountSegment(const Segment& segment, const UUID& client_id);

    // 准备卸载(删除分配器)
    ErrorCode PrepareUnmountSegment(const UUID& segment_id,
                                    size_t& metrics_dec_capacity);

    // 提交卸载(删除 segment)
    ErrorCode CommitUnmountSegment(const UUID& segment_id,
                                   const UUID& client_id,
                                   const size_t& metrics_dec_capacity);

private:
    SegmentManager* segment_manager_;
    std::unique_lock<std::shared_mutex> lock_;  // 独占锁
};

// Allocator 访问的 RAII 封装
class ScopedAllocatorAccess {
public:
    explicit ScopedAllocatorAccess(const AllocatorManager& allocator_manager,
                                   std::shared_mutex& mutex)
        : allocator_manager_(allocator_manager), lock_(mutex) {}

    const AllocatorManager& getAllocatorManager() { return allocator_manager_; }

private:
    const AllocatorManager& allocator_manager_;
    std::shared_lock<std::shared_mutex> lock_;  // 共享锁(读锁)
};

这种设计的优势:

  1. 自动释放锁:离开作用域时自动释放,避免死锁
  2. 读写分离:Segment 操作用独占锁,Allocator 查询用共享锁
  3. 编译时检查:类型系统保证正确使用

5.4 内存分配器#

5.4.1 分配器类型#

Mooncake Store 支持两种内存分配器:

1
2
3
4
enum class BufferAllocatorType {
    CACHELIB = 0,  // Facebook CacheLib 分配器
    OFFSET = 1,    // Offset 分配器
};
5.4.2 分配器接口#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class BufferAllocatorBase {
public:
    virtual ~BufferAllocatorBase() = default;

    // 分配内存
    virtual std::unique_ptr<AllocatedBuffer> allocate(size_t size) = 0;

    // 释放内存
    virtual void deallocate(AllocatedBuffer* handle) = 0;

    // 容量信息
    virtual size_t capacity() const = 0;
    virtual size_t size() const = 0;

    // 元数据
    virtual std::string getSegmentName() const = 0;
    virtual std::string getTransportEndpoint() const = 0;

    // 获取最大可用空闲区域(用于分配决策)
    virtual size_t getLargestFreeRegion() const = 0;
};
5.4.3 CacheLib 分配器#

CacheLib 是 Facebook 开源的高性能缓存库,采用 Slab 分配策略:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CachelibBufferAllocator
    : public BufferAllocatorBase,
      public std::enable_shared_from_this<CachelibBufferAllocator> {
public:
    CachelibBufferAllocator(std::string segment_name, size_t base, size_t size,
                            std::string transport_endpoint);

    std::unique_ptr<AllocatedBuffer> allocate(size_t size) override;
    void deallocate(AllocatedBuffer* handle) override;

    // CacheLib 无法精确获知最大空闲区域,返回特殊值
    size_t getLargestFreeRegion() const override {
        return kAllocatorUnknownFreeSpace;
    }

private:
    const std::string segment_name_;
    const size_t base_;
    const size_t total_size_;
    std::atomic_size_t cur_size_;

    // CacheLib 内部结构
    std::unique_ptr<char[]> header_region_start_;
    std::unique_ptr<facebook::cachelib::MemoryAllocator> memory_allocator_;
    facebook::cachelib::PoolId pool_id_;
};

CacheLib Slab 分配原理:

graph TB
    subgraph "CacheLib Slab Allocator"
        subgraph "Pool"
            S1[Slab 1<br/>4MB]
            S2[Slab 2<br/>4MB]
            S3[Slab 3<br/>4MB]
        end

        subgraph "Slab 1 内部"
            C1[Class 64B]
            C2[Class 128B]
            C3[Class 256B]
            C4[Class 512B]
        end

        subgraph "64B Class"
            B1[Block 1]
            B2[Block 2]
            B3[Block 3]
            B4[...]
        end

        S1 --> C1 & C2 & C3 & C4
        C1 --> B1 & B2 & B3 & B4
    end
5.4.4 Offset 分配器#

Offset 分配器提供更精细的内存管理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class OffsetBufferAllocator
    : public BufferAllocatorBase,
      public std::enable_shared_from_this<OffsetBufferAllocator> {
public:
    OffsetBufferAllocator(std::string segment_name, size_t base, size_t size,
                          std::string transport_endpoint);

    std::unique_ptr<AllocatedBuffer> allocate(size_t size) override;
    void deallocate(AllocatedBuffer* handle) override;

    // Offset 分配器可以返回精确的最大空闲区域
    size_t getLargestFreeRegion() const override;

private:
    const std::string segment_name_;
    const size_t base_;
    const size_t total_size_;
    std::atomic_size_t cur_size_;

    // 使用 offset_allocator 库
    std::shared_ptr<offset_allocator::OffsetAllocator> offset_allocator_;
};
5.4.5 AllocatedBuffer#

AllocatedBuffer 是分配结果的 RAII 封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AllocatedBuffer {
public:
    AllocatedBuffer(std::shared_ptr<BufferAllocatorBase> allocator,
                    void* buffer_ptr, std::size_t size,
                    std::optional<offset_allocator::OffsetAllocationHandle>&&
                        offset_handle = std::nullopt)
        : allocator_(std::move(allocator)),  // 使用 weak_ptr 避免循环引用
          buffer_ptr_(buffer_ptr),
          size_(size),
          offset_handle_(std::move(offset_handle)) {}

    ~AllocatedBuffer(); // 析构时自动归还内存

    // 获取描述符(用于 RDMA 传输)
    [[nodiscard]] Descriptor get_descriptor() const;

    // 描述符结构
    struct Descriptor {
        uint64_t size_;
        uintptr_t buffer_address_;      // RDMA 远程地址
        std::string transport_endpoint_; // 传输端点
    };

private:
    std::weak_ptr<BufferAllocatorBase> allocator_;  // 弱引用
    void* buffer_ptr_{nullptr};
    std::size_t size_{0};
    std::optional<offset_allocator::OffsetAllocationHandle> offset_handle_;
};

5.5 分配策略#

5.5.1 分配策略接口#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class AllocationStrategy {
public:
    virtual ~AllocationStrategy() = default;

    // 分配副本
    virtual tl::expected<std::vector<Replica>, ErrorCode> Allocate(
        const AllocatorManager& allocator_manager,
        uint64_t size,
        size_t replica_num,
        const std::vector<std::string>& preferred_segments) = 0;
};
5.5.2 随机分配策略#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class RandomAllocationStrategy : public AllocationStrategy {
public:
    tl::expected<std::vector<Replica>, ErrorCode> Allocate(
        const AllocatorManager& allocator_manager,
        uint64_t size,
        size_t replica_num,
        const std::vector<std::string>& preferred_segments) override {

        std::vector<Replica> replicas;
        auto& allocators = allocator_manager.getAllocators();

        // 1. 首先尝试首选 segment
        for (const auto& preferred : preferred_segments) {
            if (auto it = allocators.find(preferred); it != allocators.end()) {
                auto buffer = it->second->allocate(size);
                if (buffer) {
                    replicas.emplace_back(std::move(buffer),
                                         ReplicaStatus::PROCESSING);
                    if (replicas.size() >= replica_num) {
                        return replicas;
                    }
                }
            }
        }

        // 2. 随机选择其他 segment
        std::vector<std::string> candidates;
        for (const auto& [name, alloc] : allocators) {
            if (alloc->getLargestFreeRegion() >= size) {
                candidates.push_back(name);
            }
        }

        std::shuffle(candidates.begin(), candidates.end(),
                    std::default_random_engine{});

        for (const auto& name : candidates) {
            auto buffer = allocators.at(name)->allocate(size);
            if (buffer) {
                replicas.emplace_back(std::move(buffer),
                                     ReplicaStatus::PROCESSING);
                if (replicas.size() >= replica_num) {
                    return replicas;
                }
            }
        }

        if (replicas.empty()) {
            return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
        }
        return replicas;
    }
};

分配决策流程:

flowchart TD
    A[分配请求] --> B{有首选 segment?}
    B -->|是| C[尝试首选 segment]
    C --> D{分配成功?}
    D -->|是| E{副本数满足?}
    E -->|是| F[返回副本列表]
    E -->|否| G[继续分配]
    D -->|否| G
    B -->|否| G
    G --> H[筛选可用 allocators]
    H --> I[随机排序]
    I --> J[依次尝试分配]
    J --> K{分配成功?}
    K -->|是| L{副本数满足?}
    L -->|是| F
    L -->|否| J
    K -->|否| M{还有候选?}
    M -->|是| J
    M -->|否| N[返回已分配副本或错误]

第六章:MasterService 核心实现#

6.1 MasterService 概述#

MasterService 是 Mooncake Store 的核心组件,负责:

  • 元数据管理
  • 空间分配
  • 副本管理
  • 客户端监控
  • 驱逐控制
classDiagram
    class MasterService {
        -metadata_shards_: array~MetadataShard, 1024~
        -segment_manager_: SegmentManager
        -allocation_strategy_: AllocationStrategy
        -eviction_thread_: thread
        -client_monitor_thread_: thread
        +MountSegment()
        +UnmountSegment()
        +PutStart()
        +PutEnd()
        +GetReplicaList()
        +Remove()
    }

    class MetadataShard {
        +mutex: Mutex
        +metadata: Map~string, ObjectMetadata~
        +processing_keys: Set~string~
    }

    class ObjectMetadata {
        +client_id: UUID
        +put_start_time: time_point
        +size: uint64_t
        +replicas: vector~Replica~
        +lease_timeout: time_point
        +soft_pin_timeout: time_point
    }

    MasterService --> MetadataShard
    MetadataShard --> ObjectMetadata

6.2 分片元数据管理#

6.2.1 分片设计#

为了支持高并发访问,元数据被分成 1024 个分片:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 分片数量
static constexpr size_t kNumShards = 1024;

// 元数据分片结构
struct MetadataShard {
    mutable Mutex mutex;  // 分片级别的锁
    std::unordered_map<std::string, ObjectMetadata> metadata;
    std::unordered_set<std::string> processing_keys;  // 正在处理的 key
};

// 分片数组
std::array<MetadataShard, kNumShards> metadata_shards_;

// 分片索引计算
size_t getShardIndex(const std::string& key) const {
    return std::hash<std::string>{}(key) % kNumShards;
}
6.2.2 元数据访问器#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MetadataAccessor {
public:
    MetadataAccessor(MasterService* service, const std::string& key)
        : service_(service),
          key_(key),
          shard_idx_(service->getShardIndex(key)),
          lock_(&service->metadata_shards_[shard_idx_].mutex) {}

    bool Exists() const {
        return service_->metadata_shards_[shard_idx_].metadata.count(key_) > 0;
    }

    ObjectMetadata& Get() {
        return service_->metadata_shards_[shard_idx_].metadata.at(key_);
    }

    void Erase() {
        service_->metadata_shards_[shard_idx_].metadata.erase(key_);
    }

    bool InProcessing() const {
        return service_->metadata_shards_[shard_idx_]
                   .processing_keys.count(key_) > 0;
    }

private:
    MasterService* service_;
    std::string key_;
    size_t shard_idx_;
    MutexLocker lock_;  // RAII 锁
};

分片设计的优势:

graph LR
    subgraph "无分片设计"
        A1[请求 1] --> L1[全局锁]
        A2[请求 2] --> L1
        A3[请求 3] --> L1
        L1 --> M1[元数据]
    end

    subgraph "分片设计"
        B1[请求 1<br/>hash=0] --> L2[分片锁 0]
        B2[请求 2<br/>hash=512] --> L3[分片锁 512]
        B3[请求 3<br/>hash=1] --> L4[分片锁 1]
        L2 --> M2[分片 0]
        L3 --> M3[分片 512]
        L4 --> M4[分片 1]
    end

6.3 Put 操作实现#

6.3.1 PutStart#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
auto MasterService::PutStart(const UUID& client_id, const std::string& key,
                             const uint64_t slice_length,
                             const ReplicateConfig& config)
    -> tl::expected<std::vector<Replica::Descriptor>, ErrorCode> {

    // 1. 参数验证
    if (config.replica_num == 0 || key.empty() || slice_length == 0) {
        return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
    }

    // 2. CacheLib 大小限制检查
    if ((memory_allocator_type_ == BufferAllocatorType::CACHELIB) &&
        (slice_length > kMaxSliceSize)) {
        return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
    }

    // 3. 锁定分片并检查对象是否已存在
    size_t shard_idx = getShardIndex(key);
    MutexLocker lock(&metadata_shards_[shard_idx].mutex);

    const auto now = std::chrono::steady_clock::now();
    auto it = metadata_shards_[shard_idx].metadata.find(key);

    if (it != metadata_shards_[shard_idx].metadata.end() &&
        !CleanupStaleHandles(it->second)) {
        auto& metadata = it->second;

        // 处理过期的 PutStart(允许覆盖)
        if (!metadata.HasCompletedReplicas() &&
            metadata.put_start_time + put_start_discard_timeout_sec_ < now) {
            // 丢弃旧的处理中副本
            auto replicas = metadata.DiscardProcessingReplicas();
            if (!replicas.empty()) {
                std::lock_guard lock(discarded_replicas_mutex_);
                discarded_replicas_.emplace_back(
                    std::move(replicas),
                    metadata.put_start_time + put_start_release_timeout_sec_);
            }
            metadata_shards_[shard_idx].processing_keys.erase(key);
            metadata_shards_[shard_idx].metadata.erase(it);
        } else {
            return tl::make_unexpected(ErrorCode::OBJECT_ALREADY_EXISTS);
        }
    }

    // 4. 分配副本空间
    std::vector<Replica> replicas;
    {
        ScopedAllocatorAccess allocator_access =
            segment_manager_.getAllocatorAccess();

        auto allocation_result = allocation_strategy_->Allocate(
            allocator_access.getAllocatorManager(),
            slice_length,
            config.replica_num,
            preferred_segments);

        if (!allocation_result.has_value()) {
            need_eviction_ = true;  // 触发驱逐
            return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
        }
        replicas = std::move(allocation_result.value());
    }

    // 5. 如果启用磁盘副本,添加磁盘副本
    if (use_disk_replica_) {
        std::string file_path = ResolvePath(key);
        replicas.emplace_back(file_path, slice_length, ReplicaStatus::PROCESSING);
    }

    // 6. 创建描述符列表返回给客户端
    std::vector<Replica::Descriptor> replica_list;
    for (const auto& replica : replicas) {
        replica_list.emplace_back(replica.get_descriptor());
    }

    // 7. 存储元数据
    metadata_shards_[shard_idx].metadata.emplace(
        std::piecewise_construct,
        std::forward_as_tuple(key),
        std::forward_as_tuple(client_id, now, slice_length,
                              std::move(replicas), config.with_soft_pin));

    // 8. 加入处理集合用于超时监控
    metadata_shards_[shard_idx].processing_keys.insert(key);

    return replica_list;
}
6.3.2 PutEnd#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
auto MasterService::PutEnd(const UUID& client_id, const std::string& key,
                           ReplicaType replica_type)
    -> tl::expected<void, ErrorCode> {

    MetadataAccessor accessor(this, key);

    if (!accessor.Exists()) {
        return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
    }

    auto& metadata = accessor.Get();

    // 验证客户端权限
    if (client_id != metadata.client_id) {
        return tl::make_unexpected(ErrorCode::ILLEGAL_CLIENT);
    }

    // 标记对应类型的副本为完成
    for (auto& replica : metadata.replicas) {
        if (replica.type() == replica_type) {
            replica.mark_complete();
        }

        // 如果启用 offload,推送到 offload 队列
        if (enable_offload_) {
            PushOffloadingQueue(key, replica);
        }
    }

    // 如果所有副本完成,从处理集合中移除
    if (metadata.IsAllReplicasComplete() && accessor.InProcessing()) {
        accessor.EraseFromProcessing();
    }

    // 更新指标
    if (replica_type == ReplicaType::MEMORY) {
        MasterMetricManager::instance().inc_mem_cache_nums();
    } else if (replica_type == ReplicaType::DISK) {
        MasterMetricManager::instance().inc_file_cache_nums();
    }

    // 授予初始租约
    metadata.GrantLease(0, default_kv_soft_pin_ttl_);

    return {};
}

Put 操作时序图:

sequenceDiagram
    participant Client
    participant Master as MasterService
    participant Alloc as AllocatorManager
    participant Meta as MetadataShard

    Client->>Master: PutStart(key, size, config)
    Master->>Meta: Lock shard
    Master->>Meta: Check if key exists
    alt Key exists and not stale
        Master-->>Client: OBJECT_ALREADY_EXISTS
    else Key not exists or stale
        Master->>Alloc: Allocate(size, replica_num)
        Alloc-->>Master: Replica descriptors
        Master->>Meta: Store metadata
        Master->>Meta: Add to processing_keys
        Master-->>Client: Replica descriptors
    end

    Note over Client: RDMA write data to replicas

    Client->>Master: PutEnd(key, MEMORY)
    Master->>Meta: Lock shard
    Master->>Meta: Mark replicas complete
    Master->>Meta: Remove from processing_keys
    Master->>Meta: Grant lease
    Master-->>Client: OK

6.4 Get 操作实现#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
auto MasterService::GetReplicaList(std::string_view key)
    -> tl::expected<GetReplicaListResponse, ErrorCode> {

    MetadataAccessor accessor(this, std::string(key));

    // 更新指标
    MasterMetricManager::instance().inc_total_get_nums();

    if (!accessor.Exists()) {
        return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
    }

    auto& metadata = accessor.Get();

    // 收集完成状态的副本
    std::vector<Replica::Descriptor> replica_list;
    for (const auto& replica : metadata.replicas) {
        if (replica.status() == ReplicaStatus::COMPLETE) {
            replica_list.emplace_back(replica.get_descriptor());
        }
    }

    if (replica_list.empty()) {
        return tl::make_unexpected(ErrorCode::REPLICA_IS_NOT_READY);
    }

    // 更新缓存命中指标
    if (replica_list[0].is_memory_replica()) {
        MasterMetricManager::instance().inc_mem_cache_hit_nums();
    } else if (replica_list[0].is_disk_replica()) {
        MasterMetricManager::instance().inc_file_cache_hit_nums();
    }

    MasterMetricManager::instance().inc_valid_get_nums();

    // 授予租约防止被驱逐
    metadata.GrantLease(default_kv_lease_ttl_, default_kv_soft_pin_ttl_);

    return GetReplicaListResponse(std::move(replica_list), default_kv_lease_ttl_);
}

6.5 租约机制#

租约(Lease)是 Mooncake Store 保护正在使用的对象不被驱逐的机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class ObjectMetadata {
public:
    // 授予租约
    void GrantLease(uint64_t lease_ttl_ms, uint64_t soft_pin_ttl_ms) {
        auto now = std::chrono::steady_clock::now();
        lease_timeout = now + std::chrono::milliseconds(lease_ttl_ms);
        if (with_soft_pin) {
            soft_pin_timeout = now + std::chrono::milliseconds(soft_pin_ttl_ms);
        }
    }

    // 检查租约是否过期
    bool IsLeaseExpired(const std::chrono::steady_clock::time_point& now
                        = std::chrono::steady_clock::now()) const {
        return now > lease_timeout;
    }

    // 检查是否软钉住
    bool IsSoftPinned(const std::chrono::steady_clock::time_point& now
                      = std::chrono::steady_clock::now()) const {
        return with_soft_pin && now <= soft_pin_timeout;
    }

public:
    UUID client_id;
    std::chrono::steady_clock::time_point put_start_time;
    uint64_t size;
    std::vector<Replica> replicas;
    std::chrono::steady_clock::time_point lease_timeout;
    std::chrono::steady_clock::time_point soft_pin_timeout;
    bool with_soft_pin{false};
};

租约生命周期:

stateDiagram-v2
    [*] --> NoLease: PutEnd
    NoLease --> ActiveLease: Get (授予 lease)
    ActiveLease --> ActiveLease: Get (续约)
    ActiveLease --> SoftPinned: Lease 过期
    SoftPinned --> Evictable: SoftPin 过期
    Evictable --> [*]: 驱逐
    NoLease --> Evictable: 无访问

    note right of ActiveLease: 正在被读取<br/>不可驱逐
    note right of SoftPinned: 最近访问过<br/>低优先级驱逐
    note right of Evictable: 可以被驱逐

第七章:驱逐与高可用#

7.1 驱逐策略#

7.1.1 驱逐触发条件#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
void MasterService::EvictionThreadFunc() {
    while (eviction_running_) {
        double used_ratio =
            MasterMetricManager::instance().get_global_mem_used_ratio();

        // 触发条件:
        // 1. 使用率超过高水位
        // 2. 或者 need_eviction_ 标志被设置(分配失败时)
        if (used_ratio > eviction_high_watermark_ratio_ ||
            (need_eviction_ && eviction_ratio_ > 0.0)) {

            double evict_ratio_target = std::max(
                eviction_ratio_,
                used_ratio - eviction_high_watermark_ratio_ + eviction_ratio_);

            BatchEvict(evict_ratio_target, evict_ratio_lowerbound);
        }

        std::this_thread::sleep_for(
            std::chrono::milliseconds(kEvictionThreadSleepMs));
    }
}
7.1.2 批量驱逐算法#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
void MasterService::BatchEvict(double evict_ratio_target,
                               double evict_ratio_lowerbound) {
    auto now = std::chrono::steady_clock::now();
    long evicted_count = 0;
    uint64_t total_freed_size = 0;

    // 候选列表
    std::vector<std::chrono::steady_clock::time_point> no_pin_objects;
    std::vector<std::chrono::steady_clock::time_point> soft_pin_objects;

    // 随机起始分片,避免不均衡驱逐
    size_t start_idx = rand() % metadata_shards_.size();

    // 第一遍:驱逐无 soft pin 且租约过期的对象
    for (size_t i = 0; i < metadata_shards_.size(); i++) {
        auto& shard = metadata_shards_[(start_idx + i) % metadata_shards_.size()];
        MutexLocker lock(&shard.mutex);

        // 先清理过期的处理中 key
        DiscardExpiredProcessingKeys(shard, now);

        // 计算该分片需要驱逐的数量
        const long ideal_evict_num =
            std::ceil(object_count * evict_ratio_target) - evicted_count;

        // 收集候选对象
        std::vector<std::chrono::steady_clock::time_point> candidates;
        for (auto it = shard.metadata.begin(); it != shard.metadata.end(); it++) {
            // 跳过未过期或未完成的对象
            if (!it->second.IsLeaseExpired(now) ||
                it->second.HasDiffRepStatus(ReplicaStatus::COMPLETE,
                                            ReplicaType::MEMORY)) {
                continue;
            }

            if (!it->second.IsSoftPinned(now)) {
                if (ideal_evict_num > 0) {
                    candidates.push_back(it->second.lease_timeout);
                } else {
                    no_pin_objects.push_back(it->second.lease_timeout);
                }
            } else if (allow_evict_soft_pinned_objects_) {
                soft_pin_objects.push_back(it->second.lease_timeout);
            }
        }

        // 使用 nth_element 找到驱逐阈值
        if (ideal_evict_num > 0 && !candidates.empty()) {
            long evict_num = std::min(ideal_evict_num, (long)candidates.size());
            std::nth_element(candidates.begin(),
                             candidates.begin() + (evict_num - 1),
                             candidates.end());
            auto target_timeout = candidates[evict_num - 1];

            // 驱逐 lease_timeout <= target_timeout 的对象
            auto it = shard.metadata.begin();
            while (it != shard.metadata.end()) {
                if (ShouldEvict(it->second, target_timeout, now)) {
                    total_freed_size += it->second.size *
                                       it->second.GetMemReplicaCount();
                    it->second.EraseReplica(ReplicaType::MEMORY);
                    if (!it->second.IsValid()) {
                        it = shard.metadata.erase(it);
                    } else {
                        ++it;
                    }
                    evicted_count++;
                } else {
                    ++it;
                }
            }
        }
    }

    // 第二遍:如果第一遍不够,继续驱逐
    if (target_evict_num > 0) {
        // ... 第二遍驱逐逻辑
    }
}

驱逐优先级:

graph TD
    subgraph "驱逐优先级(从高到低)"
        P1[1. 租约过期 + 无 SoftPin<br/>最旧的 lease_timeout]
        P2[2. 租约过期 + 有 SoftPin<br/>但 SoftPin 也过期]
        P3[3. 租约过期 + SoftPin 未过期<br/>仅在允许时驱逐]
        P4[4. 租约未过期<br/>不可驱逐]
    end

    P1 --> P2 --> P3 --> P4
7.1.3 nth_element 优化#

使用 std::nth_element 而非完全排序,时间复杂度从 O(n log n) 降为 O(n):

1
2
3
4
5
6
7
// 找到第 k 小的元素作为阈值
std::nth_element(candidates.begin(),
                 candidates.begin() + (evict_num - 1),
                 candidates.end());
auto target_timeout = candidates[evict_num - 1];

// 驱逐所有 <= target_timeout 的对象

7.2 客户端监控#

7.2.1 心跳机制#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void MasterService::ClientMonitorFunc() {
    std::unordered_map<UUID, std::chrono::steady_clock::time_point,
                       boost::hash<UUID>> client_ttl;

    while (client_monitor_running_) {
        auto now = std::chrono::steady_clock::now();

        // 从队列中读取心跳消息
        PodUUID pod_client_id;
        while (client_ping_queue_.pop(pod_client_id)) {
            UUID client_id = {pod_client_id.first, pod_client_id.second};
            client_ttl[client_id] =
                now + std::chrono::seconds(client_live_ttl_sec_);
        }

        // 找出过期的客户端
        std::vector<UUID> expired_clients;
        for (auto it = client_ttl.begin(); it != client_ttl.end();) {
            if (it->second < now) {
                expired_clients.push_back(it->first);
                it = client_ttl.erase(it);
            } else {
                ++it;
            }
        }

        // 处理过期客户端
        if (!expired_clients.empty()) {
            HandleExpiredClients(expired_clients);
        }

        std::this_thread::sleep_for(
            std::chrono::milliseconds(kClientMonitorSleepMs));
    }
}
7.2.2 过期客户端处理#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void HandleExpiredClients(const std::vector<UUID>& expired_clients) {
    // 1. 更新客户端状态为 NEED_REMOUNT
    {
        std::unique_lock<std::shared_mutex> lock(client_mutex_);
        for (auto& client_id : expired_clients) {
            auto it = ok_client_.find(client_id);
            if (it != ok_client_.end()) {
                ok_client_.erase(it);
                MasterMetricManager::instance().dec_active_clients();
            }
        }

        // 2. 准备卸载 segment
        ScopedSegmentAccess segment_access = segment_manager_.getSegmentAccess();
        for (auto& client_id : expired_clients) {
            std::vector<Segment> segments;
            segment_access.GetClientSegments(client_id, segments);
            for (auto& seg : segments) {
                segment_access.PrepareUnmountSegment(seg.id, ...);
            }
        }
    }

    // 3. 清理无效句柄(释放锁后执行,避免长时间持锁)
    ClearInvalidHandles();

    // 4. 提交卸载
    ScopedSegmentAccess segment_access = segment_manager_.getSegmentAccess();
    for (const auto& segment_id : unmount_segments) {
        segment_access.CommitUnmountSegment(segment_id, ...);
    }
}

客户端生命周期:

sequenceDiagram
    participant Client
    participant Master as MasterService
    participant Monitor as ClientMonitor

    Client->>Master: MountSegment
    Master->>Monitor: Push to ping_queue
    Monitor->>Monitor: Start TTL timer

    loop 心跳循环
        Client->>Master: Ping
        Master->>Monitor: Push to ping_queue
        Monitor->>Monitor: Reset TTL
    end

    Note over Client: 客户端崩溃或网络中断

    Monitor->>Monitor: TTL 过期
    Monitor->>Master: Mark as NEED_REMOUNT
    Master->>Master: Unmount segments
    Master->>Master: Clear invalid handles

7.3 高可用与 Leader 选举#

7.3.1 基于 etcd 的 Leader 选举#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void MasterViewHelper::ElectLeader(const std::string& master_address,
                                   ViewVersionId& version,
                                   EtcdLeaseId& lease_id) {
    while (true) {
        // 1. 检查是否已有 leader
        ViewVersionId current_version = 0;
        std::string current_master;
        auto ret = EtcdHelper::Get(master_view_key_.c_str(), ...,
                                   current_master, current_version);

        if (ret != ErrorCode::ETCD_KEY_NOT_EXIST) {
            // 已有 leader,等待其退位
            LOG(INFO) << "CurrentLeader=" << current_master
                      << ", Waiting for leadership change...";
            EtcdHelper::WatchUntilDeleted(master_view_key_.c_str(), ...);
            continue;
        }

        // 2. 无 leader,尝试选举自己
        LOG(INFO) << "No leader found, trying to elect self as leader";

        // 3. 申请租约
        ret = EtcdHelper::GrantLease(ETCD_MASTER_VIEW_LEASE_TTL, lease_id);
        if (ret != ErrorCode::OK) {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            continue;
        }

        // 4. 原子性创建 key(事务)
        ret = EtcdHelper::CreateWithLease(
            master_view_key_.c_str(), ...,
            master_address.c_str(), ...,
            lease_id, version);

        if (ret == ErrorCode::ETCD_TRANSACTION_FAIL) {
            // 竞争失败,其他节点已成为 leader
            std::this_thread::sleep_for(std::chrono::seconds(1));
            continue;
        } else if (ret == ErrorCode::OK) {
            LOG(INFO) << "Successfully elected self as leader";
            return;
        }
    }
}
7.3.2 租约保活#
1
2
3
4
void MasterViewHelper::KeepLeader(EtcdLeaseId lease_id) {
    // 周期性续约,保持 leader 地位
    EtcdHelper::KeepAlive(lease_id);
}
7.3.3 防止脑裂#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int MasterServiceSupervisor::Start() {
    while (true) {
        // 1. 选举成为 leader
        mv_helper.ElectLeader(config_.local_hostname, view_version, lease_id);

        // 2. 启动保活线程
        auto keep_leader_thread = std::thread([&]() {
            mv_helper.KeepLeader(lease_id);
            server.stop();  // 租约丢失时停止服务
        });

        // 3. 等待足够长时间让旧 leader 退位
        // 防止脑裂:等待时间 >= 租约 TTL
        const int waiting_time = ETCD_MASTER_VIEW_LEASE_TTL;
        std::this_thread::sleep_for(std::chrono::seconds(waiting_time));

        // 4. 启动服务
        server.async_start();

        // 5. 等待服务停止(租约丢失或其他错误)
        auto server_err = std::move(ec).get();

        // 6. 清理并重新选举
        EtcdHelper::CancelKeepAlive(lease_id);
        keep_leader_thread.join();
    }
}

Leader 选举流程:

sequenceDiagram
    participant Node1 as Node 1
    participant Node2 as Node 2
    participant etcd

    Node1->>etcd: Get master_view_key
    etcd-->>Node1: KEY_NOT_EXIST

    Node2->>etcd: Get master_view_key
    etcd-->>Node2: KEY_NOT_EXIST

    Node1->>etcd: GrantLease(TTL=5s)
    etcd-->>Node1: lease_id=123

    Node2->>etcd: GrantLease(TTL=5s)
    etcd-->>Node2: lease_id=456

    Node1->>etcd: CreateWithLease(key, "node1", lease=123)
    etcd-->>Node1: OK, version=1

    Node2->>etcd: CreateWithLease(key, "node2", lease=456)
    etcd-->>Node2: TRANSACTION_FAIL (key exists)

    Note over Node1: 成为 Leader
    Note over Node2: 等待 Leader 变更

    loop Keep Alive
        Node1->>etcd: KeepAlive(lease=123)
        etcd-->>Node1: OK
    end

    Note over Node1: Node 1 崩溃,停止续约

    etcd->>etcd: Lease 123 过期,删除 key
    etcd->>Node2: Watch: key deleted

    Node2->>etcd: CreateWithLease(key, "node2", lease=456)
    etcd-->>Node2: OK, version=2

    Note over Node2: 成为新 Leader

7.4 持久化存储后端#

7.4.1 文件路径解析#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::string MasterService::ResolvePath(const std::string& key) const {
    // 计算 key 的哈希值
    size_t hash = std::hash<std::string>{}(key);

    // 使用低 8 位创建 2 级目录结构
    // 目的:避免单目录文件过多
    char dir1 = static_cast<char>('a' + (hash & 0x0F));        // 16 个目录
    char dir2 = static_cast<char>('a' + ((hash >> 4) & 0x0F)); // 16 个子目录

    // 构建完整路径
    namespace fs = std::filesystem;
    fs::path full_path = fs::path(root_fs_dir_) / cluster_id_ /
                         std::string(1, dir1) / std::string(1, dir2) /
                         SanitizeKey(key);

    return full_path.lexically_normal().string();
}

// 示例:
// key = "model/layer1/kvcache/token_0"
// hash = 0x1234ABCD
// dir1 = 'a' + (0xD) = 'n'
// dir2 = 'a' + (0xC) = 'm'
// path = /data/mooncake_cluster/n/m/model_layer1_kvcache_token_0
7.4.2 Key 清理#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
std::string MasterService::SanitizeKey(const std::string& key) const {
    // 不允许的文件系统字符
    constexpr std::string_view kInvalidChars = "/\\:*?\"<>|";

    std::string sanitized_key;
    sanitized_key.reserve(key.size());

    for (char c : key) {
        // 将非法字符替换为下划线
        sanitized_key.push_back(
            kInvalidChars.find(c) != std::string_view::npos ? '_' : c);
    }
    return sanitized_key;
}

7.5 本章小结#

本章深入分析了 Mooncake Store 的核心实现:

  1. 分片元数据:1024 分片设计,支持高并发访问
  2. RAII 访问控制:通过 Scoped Access 类保证线程安全
  3. 租约机制:保护正在使用的对象不被驱逐
  4. 智能驱逐:两遍扫描、nth_element 优化、SoftPin 支持
  5. 客户端监控:心跳检测、过期清理
  6. 高可用:基于 etcd 的 Leader 选举、防脑裂设计

这些设计使 Mooncake Store 能够在高并发、大规模 KVCache 场景下保持高性能和高可靠性。


第四部分:Transfer Engine 深度解析#

本部分深入分析 Mooncake Transfer Engine 的内部实现,包括传输架构设计、RDMA 传输层、Slice 分片机制以及批量传输优化。

第八章:Transfer Engine 架构#

8.1 整体架构#

Transfer Engine 是 Mooncake 的高性能数据传输引擎,专为大规模 GPU 集群设计:

graph TB
    subgraph "Transfer Engine 架构"
        subgraph "API Layer"
            TE[TransferEngine]
            TEI[TransferEngineImpl]
            TENT[TENT Engine<br/>Next Gen]
        end

        subgraph "Transport Layer"
            MT[MultiTransport]
            RDMA[RdmaTransport]
            TCP[TcpTransport]
            LOCAL[LocalTransport]
            NVME[NvmeofTransport]
        end

        subgraph "RDMA Internals"
            CTX[RdmaContext]
            EP[RdmaEndPoint]
            CQ[Completion Queue]
            QP[Queue Pair]
        end

        subgraph "Metadata"
            TM[TransferMetadata]
            TOPO[Topology]
            SEG[SegmentDesc]
            BUF[BufferDesc]
        end

        TE --> TEI
        TE -.-> TENT
        TEI --> MT
        MT --> RDMA & TCP & LOCAL & NVME
        RDMA --> CTX
        CTX --> EP
        EP --> QP
        CTX --> CQ
        TEI --> TM
        TM --> TOPO & SEG & BUF
    end

8.2 核心 API#

8.2.1 TransferEngine 类#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class TransferEngine {
public:
    // 构造函数
    TransferEngine(bool auto_discover = false);
    TransferEngine(bool auto_discover, const std::vector<std::string>& filter);

    // 初始化
    int init(const std::string& metadata_conn_string,
             const std::string& local_server_name,
             const std::string& ip_or_host_name = "",
             uint64_t rpc_port = 12345);

    // Transport 管理
    Transport* installTransport(const std::string& proto, void** args);
    int uninstallTransport(const std::string& proto);

    // Segment 操作
    SegmentHandle openSegment(const std::string& segment_name);
    int closeSegment(SegmentHandle handle);

    // 内存注册
    int registerLocalMemory(void* addr, size_t length,
                            const std::string& location = kWildcardLocation,
                            bool remote_accessible = true,
                            bool update_metadata = true);
    int unregisterLocalMemory(void* addr, bool update_metadata = true);

    // 批量传输
    BatchID allocateBatchID(size_t batch_size);
    Status freeBatchID(BatchID batch_id);
    Status submitTransfer(BatchID batch_id,
                          const std::vector<TransferRequest>& entries);
    Status getTransferStatus(BatchID batch_id, size_t task_id,
                             TransferStatus& status);

private:
    std::shared_ptr<TransferEngineImpl> impl_;
    std::shared_ptr<mooncake::tent::TransferEngine> impl_tent_;
    bool use_tent_{false};  // 是否使用新一代引擎
};
8.2.2 TENT(Next Generation Transfer Engine)#

Mooncake 支持两代 Transfer Engine:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int TransferEngine::init(const std::string& metadata_conn_string,
                         const std::string& local_server_name,
                         const std::string& ip_or_host_name,
                         uint64_t rpc_port) {
    if (!use_tent_) {
        // 使用传统实现
        return impl_->init(metadata_conn_string, local_server_name,
                           ip_or_host_name, rpc_port);
    } else {
        // 使用 TENT(新一代)
        auto config = std::make_shared<mooncake::tent::Config>();
        config->set("local_segment_name", local_server_name);

        if (metadata_conn_string == P2PHANDSHAKE) {
            config->set("metadata_type", "p2p");
        } else {
            auto [type, servers] = parseConnectionString(metadata_conn_string);
            config->set("metadata_type", type);
            config->set("metadata_servers", servers);
        }

        impl_tent_ = std::make_shared<mooncake::tent::TransferEngine>(config);
        return impl_tent_->available();
    }
}

选择引擎的方式:

flowchart TD
    A[TransferEngine 初始化] --> B{MC_USE_TENT 环境变量?}
    B -->|是| C[使用 TENT]
    B -->|否| D{MC_USE_TEV1 环境变量?}
    D -->|是| C
    D -->|否| E[使用传统 TransferEngineImpl]

    C --> F[TENT::TransferEngine]
    E --> G[TransferEngineImpl]

8.3 传输请求模型#

8.3.1 TransferRequest 结构#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
struct TransferRequest {
    enum OpCode {
        READ,   // 从远程读取到本地
        WRITE   // 从本地写入到远程
    };

    OpCode opcode;         // 操作类型
    void* source;          // 本地地址
    SegmentID target_id;   // 目标 Segment ID
    uint64_t target_offset; // 目标偏移量
    size_t length;         // 传输长度
    int advise_retry_cnt = 0; // 建议重试次数
};
8.3.2 传输状态#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
enum TransferStatusEnum {
    WAITING,    // 等待调度
    PENDING,    // 已提交,等待完成
    INVALID,    // 无效
    CANCELED,   // 已取消
    COMPLETED,  // 完成
    TIMEOUT,    // 超时
    FAILED      // 失败
};

struct TransferStatus {
    TransferStatusEnum s;
    size_t transferred_bytes;  // 已传输字节数
};

传输状态转换:

stateDiagram-v2
    [*] --> WAITING: submit
    WAITING --> PENDING: schedule
    PENDING --> COMPLETED: success
    PENDING --> FAILED: error
    PENDING --> TIMEOUT: timeout
    WAITING --> CANCELED: cancel
    COMPLETED --> [*]
    FAILED --> [*]
    TIMEOUT --> [*]
    CANCELED --> [*]

8.4 Batch 与 Task 模型#

8.4.1 层次结构#
graph TD
    subgraph "传输层次模型"
        B[BatchDesc<br/>批次描述]
        T1[TransferTask 1]
        T2[TransferTask 2]
        T3[TransferTask N]
        S1[Slice 1.1]
        S2[Slice 1.2]
        S3[Slice 2.1]
        S4[Slice N.M]

        B --> T1 & T2 & T3
        T1 --> S1 & S2
        T2 --> S3
        T3 --> S4
    end
8.4.2 BatchDesc 结构#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct BatchDesc {
    BatchID id;
    size_t batch_size;                    // 批次大小
    std::vector<TransferTask> task_list;  // Task 列表
    void* context;                        // Transport 上下文
    int64_t start_timestamp;

    // 批次级别状态跟踪
    std::atomic<bool> has_failure{false};
    std::atomic<bool> is_finished{false};
    std::atomic<uint64_t> finished_transfer_bytes{0};

#ifdef USE_EVENT_DRIVEN_COMPLETION
    // 事件驱动完成机制
    std::atomic<uint64_t> finished_task_count{0};
    std::mutex completion_mutex;
    std::condition_variable completion_cv;
#endif
};

// BatchID 到 BatchDesc 的快速转换
// 直接将 BatchID 解释为指针,避免查表开销
static inline BatchDesc& toBatchDesc(BatchID id) {
    return *reinterpret_cast<BatchDesc*>(id);
}
8.4.3 TransferTask 结构#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct TransferTask {
    volatile uint64_t slice_count = 0;        // 总 Slice 数
    volatile uint64_t success_slice_count = 0; // 成功 Slice 数
    volatile uint64_t failed_slice_count = 0;  // 失败 Slice 数
    volatile uint64_t transferred_bytes = 0;   // 已传输字节
    volatile bool is_finished = false;
    uint64_t total_bytes = 0;
    BatchID batch_id = 0;

#ifdef USE_EVENT_DRIVEN_COMPLETION
    volatile uint64_t completed_slice_count = 0;
#endif

    const TransferRequest* request = nullptr;
    std::vector<Slice*> slice_list;  // Slice 列表

    ~TransferTask() {
        // 归还 Slice 到线程本地缓存
        for (auto& slice : slice_list) {
            Transport::getSliceCache().deallocate(slice);
        }
    }
};

8.5 Slice 分片机制#

8.5.1 Slice 结构#

Slice 是传输的最小单元,每个 Slice 对应一次 RDMA 操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
struct Slice {
    enum SliceStatus { PENDING, POSTED, SUCCESS, TIMEOUT, FAILED };

    void* source_addr;                     // 源地址
    size_t length;                         // 长度
    TransferRequest::OpCode opcode;        // 操作类型
    SegmentID target_id;                   // 目标 Segment
    std::string peer_nic_path;             // 对端网卡路径
    SliceStatus status;                    // 状态
    TransferTask* task;                    // 所属 Task
    std::vector<uint32_t> dest_rkeys;      // 远程 key 列表
    bool from_cache;                       // 是否来自缓存

    // RDMA 特定字段
    union {
        struct {
            uint64_t dest_addr;            // 远程地址
            uint32_t source_lkey;          // 本地 lkey
            uint32_t dest_rkey;            // 远程 rkey
            int lkey_index;
            int rkey_index;
            volatile int* qp_depth;        // QP 深度计数
            uint32_t retry_cnt;            // 重试次数
            uint32_t max_retry_cnt;        // 最大重试次数
        } rdma;

        struct {
            void* dest_addr;
        } local;

        struct {
            uint64_t dest_addr;
        } tcp;

        struct {
            uint64_t offset;
            int cufile_desc;
            uint64_t start;
            const char* file_path;
        } nvmeof;
    };

    // 标记成功
    void markSuccess() {
        status = Slice::SUCCESS;
        __atomic_fetch_add(&task->transferred_bytes, length, __ATOMIC_RELAXED);
        __atomic_fetch_add(&task->success_slice_count, 1, __ATOMIC_RELAXED);
        check_batch_completion(false);
    }

    // 标记失败
    void markFailed() {
        status = Slice::FAILED;
        __atomic_fetch_add(&task->failed_slice_count, 1, __ATOMIC_RELAXED);
        check_batch_completion(true);
    }

private:
    void check_batch_completion(bool is_failed);
};
8.5.2 事件驱动完成机制#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void Slice::check_batch_completion(bool is_failed) {
#ifdef USE_EVENT_DRIVEN_COMPLETION
    auto& batch_desc = toBatchDesc(task->batch_id);

    if (is_failed) {
        batch_desc.has_failure.store(true, std::memory_order_relaxed);
    }

    // 原子递增完成计数
    uint64_t prev_completed = __atomic_fetch_add(
        &task->completed_slice_count, 1, __ATOMIC_RELAXED);

    // 最后一个 Slice 完成时
    if (prev_completed + 1 == task->slice_count) {
        __atomic_store_n(&task->is_finished, true, __ATOMIC_RELAXED);

        // 递增批次完成任务计数
        auto prev = batch_desc.finished_task_count.fetch_add(
            1, std::memory_order_relaxed);

        // 批次中最后一个任务完成
        if (prev + 1 == batch_desc.batch_size) {
            {
                std::lock_guard<std::mutex> lock(batch_desc.completion_mutex);
                batch_desc.is_finished.store(true, std::memory_order_release);
            }
            // 唤醒等待线程
            batch_desc.completion_cv.notify_all();
        }
    }
#endif
}

完成通知流程:

sequenceDiagram
    participant CQ as Completion Queue
    participant Slice
    participant Task as TransferTask
    participant Batch as BatchDesc
    participant Waiter as 等待线程

    CQ->>Slice: Work Completion
    Slice->>Slice: markSuccess()
    Slice->>Task: atomic_inc(completed_slice_count)

    alt 最后一个 Slice
        Slice->>Task: is_finished = true
        Slice->>Batch: atomic_inc(finished_task_count)

        alt 最后一个 Task
            Slice->>Batch: is_finished = true
            Slice->>Waiter: notify_all()
        end
    end
8.5.3 线程本地 Slice 缓存#

为了减少内存分配开销,使用线程本地缓存复用 Slice 对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
struct ThreadLocalSliceCache {
    static const size_t kLazyDeleteSliceCapacity = 4096;

    std::vector<Slice*> lazy_delete_slices_;
    uint64_t head_, tail_;
    uint64_t allocated_ = 0, freed_ = 0;

    ThreadLocalSliceCache() : head_(0), tail_(0) {
        lazy_delete_slices_.resize(kLazyDeleteSliceCapacity);
    }

    ~ThreadLocalSliceCache() {
        // 清理剩余 Slice
        for (uint64_t i = tail_; i != head_; i++) {
            delete lazy_delete_slices_[i % kLazyDeleteSliceCapacity];
            freed_++;
        }
        if (allocated_ != freed_) {
            LOG(WARNING) << "detected slice leak: allocated " << allocated_
                         << " freed " << freed_;
        }
    }

    Slice* allocate() {
        Slice* slice;
        if (head_ - tail_ == 0) {
            // 缓存为空,分配新对象
            allocated_++;
            slice = new Slice();
            slice->from_cache = false;
        } else {
            // 从缓存获取
            slice = lazy_delete_slices_[tail_ % kLazyDeleteSliceCapacity];
            tail_++;
            slice->from_cache = true;
        }
        return slice;
    }

    void deallocate(Slice* slice) {
        if (head_ - tail_ == kLazyDeleteSliceCapacity) {
            // 缓存满,直接删除
            delete slice;
            freed_++;
            return;
        }
        // 放入缓存
        lazy_delete_slices_[head_ % kLazyDeleteSliceCapacity] = slice;
        head_++;
    }
};

Slice 缓存原理:

graph LR
    subgraph "ThreadLocalSliceCache"
        direction TB
        A[tail] --> B[Slice 1]
        B --> C[Slice 2]
        C --> D[...]
        D --> E[Slice N]
        E --> F[head]
    end

    G[allocate] --> A
    F --> H[deallocate]

    subgraph "环形缓冲区"
        I[容量: 4096]
        J[head - tail = 使用数]
    end

第九章:RDMA Transport 实现#

9.1 RdmaTransport 初始化#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int RdmaTransport::install(std::string& local_server_name,
                           std::shared_ptr<TransferMetadata> meta,
                           std::shared_ptr<Topology> topo) {
    if (topo == nullptr) {
        LOG(ERROR) << "RdmaTransport: missing topology";
        return ERR_INVALID_ARGUMENT;
    }

    metadata_ = meta;
    local_server_name_ = local_server_name;
    local_topology_ = topo;

    // 1. 初始化 RDMA 资源
    auto ret = initializeRdmaResources();
    if (ret) return ret;

    // 2. 分配本地 Segment ID
    ret = allocateLocalSegmentID();
    if (ret) return ret;

    // 3. 启动握手守护进程
    ret = startHandshakeDaemon(local_server_name);
    if (ret) return ret;

    // 4. 更新元数据
    ret = metadata_->updateLocalSegmentDesc();
    return ret;
}
9.1.1 Relaxed Ordering 优化#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
RdmaTransport::RdmaTransport() {
    MCIbRelaxedOrderingMode = getIbRelaxedOrderingMode();

    if (MCIbRelaxedOrderingMode == 0) {
        LOG(INFO) << "[RDMA] Relaxed ordering disabled";
        MCIbRelaxedOrderingEnabled = false;
        return;
    }

    // 检查是否支持 ibv_reg_mr_iova2(IBVERBS_1.8+)
    MCIbRelaxedOrderingEnabled = has_ibv_reg_mr_iova2();

    if (MCIbRelaxedOrderingEnabled) {
        LOG(INFO) << "[RDMA] Relaxed ordering supported; "
                     "IBV_ACCESS_RELAXED_ORDERING will be requested";
    }
}

// 检测函数
bool has_ibv_reg_mr_iova2(void) {
    void* sym = dlsym(RTLD_DEFAULT, "ibv_reg_mr_iova2");
    return sym != NULL;
}

Relaxed Ordering 是 PCIe 的一项优化特性,允许 NIC 重排序写操作以提高吞吐量:

graph LR
    subgraph "Strict Ordering"
        A1[Write 1] --> A2[Write 2] --> A3[Write 3]
    end

    subgraph "Relaxed Ordering"
        B1[Write 1]
        B2[Write 2]
        B3[Write 3]
        B1 & B2 & B3 --> B4[并行执行]
    end

9.2 内存注册#

9.2.1 并行内存注册#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
int RdmaTransport::registerLocalMemory(void* addr, size_t length,
                                       const std::string& name,
                                       bool remote_accessible,
                                       bool update_metadata) {
    const int kBaseAccessRights = IBV_ACCESS_LOCAL_WRITE |
                                  IBV_ACCESS_REMOTE_WRITE |
                                  IBV_ACCESS_REMOTE_READ;

    static int access_rights = kBaseAccessRights;
    if (MCIbRelaxedOrderingEnabled) {
        access_rights |= IBV_ACCESS_RELAXED_ORDERING;
    }

    // 大内存区域(>4GB)进行预触摸
    bool do_pre_touch = context_list_.size() > 0 &&
                        std::thread::hardware_concurrency() >= 4 &&
                        length >= (size_t)4 * 1024 * 1024 * 1024;

    if (do_pre_touch) {
        preTouchMemory(addr, length);
    }

    // 决定是否并行注册
    int use_parallel_reg = globalConfig().parallel_reg_mr;
    if (use_parallel_reg == -1) {
        use_parallel_reg = context_list_.size() > 1 && do_pre_touch;
    }

    if (use_parallel_reg) {
        // 并行注册到所有 RDMA Context
        std::vector<std::thread> reg_threads;
        std::vector<int> ret_codes(context_list_.size(), 0);

        for (size_t i = 0; i < context_list_.size(); ++i) {
            reg_threads.emplace_back([this, &ret_codes, i, addr, length,
                                      access_rights]() {
                ret_codes[i] =
                    context_list_[i]->registerMemoryRegion(addr, length,
                                                           access_rights);
            });
        }

        for (auto& thread : reg_threads) {
            thread.join();
        }
    } else {
        // 串行注册
        for (size_t i = 0; i < context_list_.size(); ++i) {
            context_list_[i]->registerMemoryRegion(addr, length, access_rights);
        }
    }

    // 收集 lkey/rkey
    BufferDesc buffer_desc;
    for (auto& context : context_list_) {
        buffer_desc.lkey.push_back(context->lkey(addr));
        buffer_desc.rkey.push_back(context->rkey(addr));
    }

    // 自动检测内存位置
    if (name == kWildcardLocation) {
        auto entries = getMemoryLocation(addr, length, true);
        buffer_desc.name = entries[0].location;
    } else {
        buffer_desc.name = name;
    }

    buffer_desc.addr = (uint64_t)addr;
    buffer_desc.length = length;
    return metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata);
}
9.2.2 内存预触摸(Pre-Touch)#
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int RdmaTransport::preTouchMemory(void* addr, size_t length) {
    // 限制最大预触摸大小
    if (length > (size_t)globalConfig().max_mr_size) {
        length = (size_t)globalConfig().max_mr_size;
    }

    auto hwc = std::thread::hardware_concurrency();
    auto num_threads = hwc > 64 ? 16 : std::min(hwc, 8u);
    size_t block_size = length / num_threads;

    std::vector<std::thread> threads;
    std::vector<int> thread_results(num_threads, 0);

    for (size_t thread_i = 0; thread_i < num_threads; ++thread_i) {
        void* block_addr = static_cast<char*>(addr) + thread_i * block_size;
        threads.emplace_back([this, thread_i, block_addr, block_size,
                              &thread_results]() {
            thread_results[thread_i] =
                context_list_[0]->preTouchMemory(block_addr, block_size);
        });
    }

    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

内存注册流程:

flowchart TD
    A[registerLocalMemory] --> B{length > 4GB?}
    B -->|是| C[并行 Pre-Touch]
    B -->|否| D{多 Context?}
    C --> D
    D -->|是| E[并行注册到各 Context]
    D -->|否| F[串行注册]
    E --> G[收集 lkey/rkey]
    F --> G
    G --> H{location = '*'?}
    H -->|是| I[自动检测内存位置]
    H -->|否| J[使用指定位置]
    I --> K[添加到元数据]
    J --> K

9.3 Segment 描述符#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
int RdmaTransport::allocateLocalSegmentID() {
    auto desc = std::make_shared<SegmentDesc>();
    desc->name = local_server_name_;
    desc->protocol = "rdma";

    // 收集所有设备信息
    for (auto& entry : context_list_) {
        TransferMetadata::DeviceDesc device_desc;
        device_desc.name = entry->deviceName();  // 如 "mlx5_0"
        device_desc.lid = entry->lid();           // Local ID
        device_desc.gid = entry->gid();           // Global ID
        desc->devices.push_back(device_desc);
    }

    desc->topology = *(local_topology_.get());
    metadata_->addLocalSegment(LOCAL_SEGMENT_ID, local_server_name_,
                               std::move(desc));
    return 0;
}

9.4 批量内存注册#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int RdmaTransport::registerLocalMemoryBatch(
    const std::vector<BufferEntry>& buffer_list,
    const std::string& location) {

#if !defined(WITH_NVIDIA_PEERMEM) && defined(USE_CUDA)
    // 无 peermem 时串行注册
    for (auto& buffer : buffer_list) {
        registerLocalMemory(buffer.addr, buffer.length, location, true, false);
    }
#else
    // 并行注册多个缓冲区
    std::vector<std::future<int>> results;

    for (auto& buffer : buffer_list) {
        results.emplace_back(
            std::async(std::launch::async, [this, buffer, location]() {
                // force_sequential=true 避免嵌套并行
                return registerLocalMemoryInternal(
                    buffer.addr, buffer.length, location, true, false, true);
            }));
    }

    for (size_t i = 0; i < buffer_list.size(); ++i) {
        results[i].get();
    }
#endif

    return metadata_->updateLocalSegmentDesc();
}

第十章:数据传输实现#

10.1 传输提交流程#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Status RdmaTransport::submitTransfer(
    BatchID batch_id, const std::vector<TransferRequest>& entries) {

    auto& batch_desc = toBatchDesc(batch_id);

    for (size_t i = 0; i < entries.size(); i++) {
        auto& task = batch_desc.task_list[i];
        const auto& request = entries[i];

        task.request = &request;
        task.total_bytes = request.length;
        task.batch_id = batch_id;

        // 获取目标 Segment 信息
        auto segment_desc = metadata_->getSegmentDesc(request.target_id);
        if (!segment_desc) {
            return Status::InvalidArgument("Segment not found");
        }

        // 分片:将大传输拆分为多个 Slice
        std::vector<Slice*> slices = createSlices(request, segment_desc);
        task.slice_count = slices.size();
        task.slice_list = std::move(slices);
    }

    // 提交到 RDMA 层
    return submitToRdma(batch_desc);
}

10.2 Slice 创建#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::vector<Slice*> RdmaTransport::createSlices(
    const TransferRequest& request,
    const std::shared_ptr<SegmentDesc>& segment_desc) {

    std::vector<Slice*> slices;
    size_t offset = 0;
    size_t remaining = request.length;

    while (remaining > 0) {
        // 确定本次 Slice 大小
        size_t slice_size = std::min(remaining, kMaxSliceSize);

        // 从线程本地缓存分配 Slice
        Slice* slice = getSliceCache().allocate();

        slice->source_addr = (char*)request.source + offset;
        slice->length = slice_size;
        slice->opcode = request.opcode;
        slice->target_id = request.target_id;
        slice->status = Slice::PENDING;

        // 选择最优路径
        auto path = selectOptimalPath(request.source, segment_desc);
        slice->peer_nic_path = path.peer_nic_path;
        slice->rdma.dest_addr = request.target_offset + offset;
        slice->rdma.source_lkey = path.source_lkey;
        slice->rdma.dest_rkey = path.dest_rkey;
        slice->rdma.retry_cnt = 0;
        slice->rdma.max_retry_cnt = kDefaultMaxRetry;

        slices.push_back(slice);
        offset += slice_size;
        remaining -= slice_size;
    }

    return slices;
}

10.3 拓扑感知路径选择#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
struct TransferPath {
    std::string peer_nic_path;
    uint32_t source_lkey;
    uint32_t dest_rkey;
    int context_index;
};

TransferPath RdmaTransport::selectOptimalPath(
    void* source_addr,
    const std::shared_ptr<SegmentDesc>& segment_desc) {

    TransferPath best_path;
    int min_distance = INT_MAX;

    // 获取源地址的 NUMA 节点
    int source_numa = getNumaNode(source_addr);

    for (size_t i = 0; i < context_list_.size(); ++i) {
        auto& context = context_list_[i];

        // 计算本地 NIC 到源地址的距离
        int local_distance = context->getNumaDistance(source_numa);

        // 遍历远程设备
        for (const auto& device : segment_desc->devices) {
            // 计算到远程 NIC 的网络距离(同交换机、跨交换机等)
            int network_distance = getNetworkDistance(
                context->deviceName(), device.name);

            int total_distance = local_distance + network_distance;

            if (total_distance < min_distance) {
                min_distance = total_distance;
                best_path.peer_nic_path = device.name;
                best_path.source_lkey = context->lkey(source_addr);
                best_path.dest_rkey = device.rkey;
                best_path.context_index = i;
            }
        }
    }

    return best_path;
}

路径选择示意图:

graph TD
    subgraph "Node A"
        CPU_A[CPU NUMA 0]
        GPU_A[GPU 0]
        NIC_A1[mlx5_0]
        NIC_A2[mlx5_1]
    end

    subgraph "Node B"
        CPU_B[CPU NUMA 0]
        GPU_B[GPU 0]
        NIC_B1[mlx5_0]
        NIC_B2[mlx5_1]
    end

    GPU_A -.->|PCIe| NIC_A1
    GPU_A -.->|跨 NUMA| NIC_A2
    NIC_A1 <-->|InfiniBand| NIC_B1
    NIC_A2 <-->|InfiniBand| NIC_B2

    style GPU_A fill:#90EE90
    style NIC_A1 fill:#90EE90
    style NIC_B1 fill:#90EE90

10.4 RDMA 操作提交#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
int RdmaEndPoint::submitPostSend(
    std::vector<Slice*>& slice_list,
    std::vector<Slice*>& failed_slice_list) {

    RWSpinlock::ReadGuard guard(lock_);

    if (!connected()) {
        for (auto slice : slice_list) {
            failed_slice_list.push_back(slice);
        }
        return -1;
    }

    int submitted = 0;
    for (auto slice : slice_list) {
        // 选择 QP(负载均衡)
        int qp_index = submitted % qp_list_.size();
        ibv_qp* qp = qp_list_[qp_index];

        // 检查 QP 深度
        if (wr_depth_list_[qp_index] >= max_wr_depth_) {
            failed_slice_list.push_back(slice);
            continue;
        }

        // 构建 Work Request
        ibv_send_wr wr = {};
        ibv_sge sge = {};

        sge.addr = (uint64_t)slice->source_addr;
        sge.length = slice->length;
        sge.lkey = slice->rdma.source_lkey;

        wr.wr_id = (uint64_t)slice;
        wr.next = nullptr;
        wr.sg_list = &sge;
        wr.num_sge = 1;
        wr.opcode = (slice->opcode == TransferRequest::WRITE)
                        ? IBV_WR_RDMA_WRITE
                        : IBV_WR_RDMA_READ;
        wr.send_flags = IBV_SEND_SIGNALED;

        wr.wr.rdma.remote_addr = slice->rdma.dest_addr;
        wr.wr.rdma.rkey = slice->rdma.dest_rkey;

        // 提交
        ibv_send_wr* bad_wr;
        int ret = ibv_post_send(qp, &wr, &bad_wr);

        if (ret == 0) {
            slice->status = Slice::POSTED;
            wr_depth_list_[qp_index]++;
            submitted++;
        } else {
            failed_slice_list.push_back(slice);
        }
    }

    return submitted;
}

10.5 Completion Queue 处理#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void RdmaContext::pollCompletionQueue() {
    ibv_wc wc[kPollBatchSize];

    while (running_) {
        int num_completions = ibv_poll_cq(cq_, kPollBatchSize, wc);

        for (int i = 0; i < num_completions; ++i) {
            Slice* slice = (Slice*)wc[i].wr_id;

            // 递减 QP 深度
            __atomic_fetch_sub(slice->rdma.qp_depth, 1, __ATOMIC_RELAXED);

            if (wc[i].status == IBV_WC_SUCCESS) {
                slice->markSuccess();
            } else {
                LOG(WARNING) << "RDMA completion error: "
                            << ibv_wc_status_str(wc[i].status);

                // 重试逻辑
                if (slice->rdma.retry_cnt < slice->rdma.max_retry_cnt) {
                    slice->rdma.retry_cnt++;
                    slice->status = Slice::PENDING;
                    resubmitSlice(slice);
                } else {
                    slice->markFailed();
                }
            }
        }

        if (num_completions == 0) {
            // 短暂休眠避免忙等
            std::this_thread::yield();
        }
    }
}

CQ 轮询流程:

flowchart TD
    A[Poll CQ] --> B{有完成事件?}
    B -->|是| C[获取 Slice from wr_id]
    C --> D{状态?}
    D -->|SUCCESS| E[slice->markSuccess]
    D -->|ERROR| F{可重试?}
    F -->|是| G[重新提交]
    F -->|否| H[slice->markFailed]
    E --> B
    G --> B
    H --> B
    B -->|否| I[yield]
    I --> A

10.6 本章小结#

本章深入分析了 Transfer Engine 的传输实现:

  1. 层次模型:Batch → Task → Slice 三层结构
  2. Slice 缓存:线程本地缓存减少分配开销
  3. 事件驱动:高效的批次完成通知机制
  4. 并行注册:多 Context 并行内存注册
  5. 拓扑感知:选择最优 NIC 路径
  6. CQ 处理:批量轮询与重试机制

这些设计使 Transfer Engine 能够充分利用 RDMA 硬件能力,实现接近线速的数据传输。


第五部分:性能优化与实战#


第十一章:性能基准测试与调优#

11.1 性能测试框架概述#

Mooncake 提供了完善的性能测试框架,用于验证系统性能、评估优化效果和进行回归测试。测试框架位于 mooncake-transfer-engine/benchmark 目录下。

11.1.1 测试架构设计#

classDiagram
    class BenchRunner {
        <<abstract>>
        +pinThread(thread_id)
        +runTarget()
        +startInitiator(num_threads)
        +stopInitiator()
        +runInitiatorTasks(func)
        +getSegmentName()
        +getLocalBufferBase()
        +getTargetBufferBase()
        +runSingleTransfer()
    }

    class TEBenchRunner {
        -TransferEngine* engine_
        +runSingleTransfer()
    }

    class TENTBenchRunner {
        -TransferEngine engine_
        -vector~thread~ threads_
        -vector~void*~ pinned_buffer_list_
        +allocateBuffers()
        +freeBuffers()
        +runner(thread_id)
    }

    class XferBenchConfig {
        <<static>>
        +seg_name
        +seg_type
        +total_buffer_size
        +start_block_size
        +max_block_size
        +batch_size
        +duration
        +num_threads
        +loadFromFlags()
    }

    class XferBenchStats {
        +XferMetricStats total_duration
        +XferMetricStats transfer_duration
    }

    class XferMetricStats {
        -vector~double~ samples
        +min()
        +max()
        +avg()
        +p90()
        +p95()
        +p99()
        +p999()
        +add(value)
    }

    BenchRunner <|-- TEBenchRunner
    BenchRunner <|-- TENTBenchRunner
    BenchRunner --> XferBenchConfig
    BenchRunner --> XferBenchStats
    XferBenchStats --> XferMetricStats

11.1.2 配置参数说明#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// mooncake-transfer-engine/benchmark/utils.cpp
DEFINE_string(seg_type, "DRAM",
              "Memory segment type: DRAM|VRAM");
DEFINE_uint64(total_buffer_size, 1UL << 30,
              "Total buffer size (in bytes).");
DEFINE_uint64(start_block_size, 4096, "Start block size (in bytes).");
DEFINE_uint64(max_block_size, 1UL << 26, "Maximum block size (in bytes).");
DEFINE_uint64(start_batch_size, 1, "Start batch size.");
DEFINE_uint64(max_batch_size, 1, "Maximum batch size.");
DEFINE_int32(duration, 5, "Duration per test case (seconds).");
DEFINE_int32(max_num_threads, 1, "Maximum number of worker threads.");
DEFINE_string(op_type, "read", "Operation type: read|write|mix");
DEFINE_bool(check_consistency, false, "Enable data consistency check.");

关键参数说明:

参数 含义 推荐值
total_buffer_size 测试缓冲区总大小 1GB-4GB
block_size 单次传输块大小 64KB-64MB
batch_size 批处理请求数量 1-128
num_threads 并发工作线程数 4-12
duration 每个测试用例持续时间 5-30 秒

11.1.3 Benchmark 工作流程#

sequenceDiagram
    participant Target as Target Process
    participant Initiator as Initiator Process
    participant Runner as BenchRunner

    Target->>Target: 分配内存缓冲区
    Target->>Target: 注册内存到 Transfer Engine
    Target->>Target: 等待连接...

    Initiator->>Initiator: 分配并注册本地内存
    Initiator->>Target: 连接到 Target Segment

    loop 每个配置组合
        Initiator->>Runner: startInitiator(num_threads)

        loop 测试持续时间
            par 多线程并发
                Runner->>Runner: 提交传输请求
                Runner->>Runner: 等待完成
                Runner->>Runner: 记录延迟
            end
        end

        Runner->>Initiator: 汇总统计结果
        Initiator->>Initiator: 打印性能报告
    end

11.1.4 统计指标计算#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// mooncake-transfer-engine/benchmark/utils.cpp
double XferMetricStats::percentile(double p) {
    if (samples.empty()) return 0.0;
    if (p <= 0) return min();
    if (p >= 100) return max();

    std::vector<double> sorted = samples;
    std::sort(sorted.begin(), sorted.end());

    double rank = (p / 100.0) * (sorted.size() - 1);
    size_t idx = static_cast<size_t>(rank);
    double frac = rank - idx;

    // 线性插值计算精确百分位
    if (idx + 1 < sorted.size()) {
        return sorted[idx] * (1.0 - frac) + sorted[idx + 1] * frac;
    } else {
        return sorted[idx];
    }
}

void printStats(size_t block_size, size_t batch_size,
                XferBenchStats& stats, int num_threads) {
    auto num_ops = stats.transfer_duration.count();
    double total_duration = stats.total_duration.avg();

    // 计算总传输数据量
    size_t total_data = (block_size * batch_size) * num_ops;

    // 计算平均延迟
    double avg_latency = (total_duration * num_threads / num_ops);

    // 计算吞吐量 (GB/s)
    double throughput_gb = ((double)total_data / (1e9)) /
                          (total_duration / 1e6);

    std::cout << std::setw(14) << block_size
              << std::setw(8)  << batch_size
              << std::setw(14) << throughput_gb
              << std::setw(14) << avg_latency
              << std::setw(14) << stats.transfer_duration.p99()
              << std::setw(14) << stats.transfer_duration.p999()
              << std::endl;
}

11.1.5 运行 Benchmark#

启动 Target 端

1
2
3
4
./tebench --backend=tent --seg_type=DRAM
# 输出:
# To start initiators, run
#   ./tebench --target_seg_name=hostname:15428 ...

启动 Initiator 端

1
2
3
4
5
6
7
8
9
./tebench --backend=tent \
    --target_seg_name=hostname:15428 \
    --op_type=read \
    --start_block_size=4096 \
    --max_block_size=67108864 \
    --start_batch_size=1 \
    --max_batch_size=128 \
    --max_num_threads=8 \
    --duration=10

输出示例

BlkSize (B)   Batch   BW (GB/S)     Avg Lat (us)  Avg Tx (us)   P99 Tx (us)   P999 Tx (us)
------------------------------------------------------------------------------------------------
4096          1       0.312456      13.1          12.8          15.2          18.7
4096          16      4.982134      12.9          12.6          14.8          17.3
65536         1       5.123456      12.8          12.5          14.5          16.9
65536         128     23.456789     356.2         354.1         412.3         489.6
1048576       1       22.345678     47.0          46.3          52.1          61.8
67108864      1       24.123456     2780.5        2778.2        2812.4        2845.1

11.2 TENT 后端测试实现#

11.2.1 内存分配与注册#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// mooncake-transfer-engine/benchmark/tent_backend.cpp
int TENTBenchRunner::allocateBuffers() {
    auto total_buffer_size = XferBenchConfig::total_buffer_size;

    if (XferBenchConfig::seg_type == "DRAM") {
        int num_buffers = numa_num_configured_nodes();
        pinned_buffer_list_.resize(num_buffers, nullptr);

        auto start_ts = getCurrentTimeInNano();

        // 每个 NUMA 节点分配一个缓冲区
        for (int i = 0; i < num_buffers; ++i) {
            auto location = "cpu:" + std::to_string(i);
            CHECK_FAIL(engine_->allocateLocalMemory(
                &pinned_buffer_list_[i], total_buffer_size, location));
        }

        auto allocated_ts = getCurrentTimeInNano();

        // 批量注册内存
        std::vector<size_t> buffers_size(
            pinned_buffer_list_.size(), total_buffer_size);
        CHECK_FAIL(engine_->registerLocalMemory(
            pinned_buffer_list_, buffers_size));

        auto registered_ts = getCurrentTimeInNano();

        LOG(INFO) << "Allocated " << total_buffer_size * num_buffers
                  << " bytes in " << (allocated_ts - start_ts) / 1e6
                  << " ms, registered in "
                  << (registered_ts - allocated_ts) / 1e6 << " ms";

    } else if (XferBenchConfig::seg_type == "VRAM") {
        int num_buffers = 0;
        cudaGetDeviceCount(&num_buffers);
        pinned_buffer_list_.resize(num_buffers, nullptr);

        for (int i = 0; i < num_buffers; ++i) {
            auto location = "cuda:" + std::to_string(i);
            CHECK_FAIL(engine_->allocateLocalMemory(
                &pinned_buffer_list_[i], total_buffer_size, location));
            CHECK_FAIL(engine_->registerLocalMemory(
                pinned_buffer_list_[i], total_buffer_size));
        }
    }

    return 0;
}

11.2.2 线程绑定策略#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// mooncake-transfer-engine/benchmark/tent_backend.cpp
void TENTBenchRunner::pinThread(int thread_id) {
    // 获取线程使用的缓冲区位置
    uint64_t addr = (uint64_t)pinned_buffer_list_[
        thread_id % pinned_buffer_list_.size()];

    // 查询内存所在位置
    auto result = Platform::getLoader().getLocation((void*)addr, 1);
    LocationParser location(result[0].location);

    if (location.type() == "cpu") {
        // 绑定到对应 NUMA Socket
        auto socket_id = location.index();
        bindToSocket(socket_id);
    } else if (location.type() == "cuda") {
        // GPU 内存,绑定到 GPU 所在的 NUMA 节点
        auto device_id = location.index();
        auto socket_id = getCudaDeviceNumaID(device_id);
        bindToSocket(socket_id);
    }
}

static inline int getCudaDeviceNumaID(int cuda_id) {
    char pci_bus_id[20];
    auto err = cudaDeviceGetPCIBusId(pci_bus_id, sizeof(pci_bus_id), cuda_id);
    if (err != cudaSuccess) {
        LOG(WARNING) << "cudaDeviceGetPCIBusId: " << cudaGetErrorString(err);
        return 0;
    }

    // 转为小写格式
    for (char* ch = pci_bus_id; (*ch = tolower(*ch)); ch++);

    // 从 sysfs 获取 NUMA 节点
    return getNumaNodeFromPciDevice(pci_bus_id);
}

线程绑定的重要性:

graph TB
    subgraph "优化前:跨 NUMA 访问"
        T1_bad[Thread 1<br/>NUMA 0] -->|远程访问| M1_bad[Memory<br/>NUMA 1]
        T2_bad[Thread 2<br/>NUMA 1] -->|远程访问| M2_bad[Memory<br/>NUMA 0]
    end

    subgraph "优化后:本地 NUMA 访问"
        T1_good[Thread 1<br/>NUMA 0] -->|本地访问| M1_good[Memory<br/>NUMA 0]
        T2_good[Thread 2<br/>NUMA 1] -->|本地访问| M2_good[Memory<br/>NUMA 1]
    end

    style T1_bad fill:#FFB6C1
    style T2_bad fill:#FFB6C1
    style T1_good fill:#90EE90
    style T2_good fill:#90EE90

11.2.3 单次传输执行#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// mooncake-transfer-engine/benchmark/tent_backend.cpp
double TENTBenchRunner::runSingleTransfer(
    uint64_t local_addr, uint64_t target_addr,
    uint64_t block_size, uint64_t batch_size, OpCode opcode) {

    // 分配批次
    auto batch_id = engine_->allocateBatch(batch_size);

    // 构建请求列表
    std::vector<Request> requests;
    for (uint64_t i = 0; i < batch_size; ++i) {
        Request entry;
        entry.opcode = opcode == READ ? Request::READ : Request::WRITE;
        entry.length = block_size;
        entry.source = (void*)(local_addr + block_size * i);
        entry.target_id = handle_;
        entry.target_offset = target_addr + block_size * i;
        requests.emplace_back(entry);
    }

    // 计时开始
    XferBenchTimer timer;

    // 提交传输
    CHECK_FAIL(engine_->submitTransfer(batch_id, requests));

    // 等待完成
    while (true) {
        TransferStatus overall_status;
        CHECK_FAIL(engine_->getTransferStatus(batch_id, overall_status));

        if (overall_status.s == TransferStatusEnum::COMPLETED) {
            break;
        } else if (overall_status.s == TransferStatusEnum::FAILED) {
            LOG(ERROR) << "Failed transfer detected";
            exit(EXIT_FAILURE);
        }
    }

    auto duration = timer.lap_us();

    // 释放批次
    CHECK_FAIL(engine_->freeBatch(batch_id));

    return duration;
}

11.3 Mooncake Store 性能测试#

11.3.1 Master Service 基准测试#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// mooncake-store/benchmarks/master_bench.cpp
class BenchClient {
public:
    void BenchFn(std::shared_ptr<std::latch> barrier,
                 BenchOperation operation,
                 uint64_t batch_size, uint64_t value_size,
                 uint64_t num_keys) {

        // 解绑 CPU 亲和性,让系统自动调度
        unset_cpu_affinity();

        uint64_t key_id = 0;
        const mooncake::ReplicateConfig config;

        // 预分配 slice_lengths
        std::vector<std::vector<uint64_t>> slice_lengths;
        slice_lengths.reserve(batch_size);
        for (size_t i = 0; i < batch_size; i++) {
            slice_lengths.push_back({value_size});
        }

        // Get 操作需要预填充数据
        if (operation == BenchOperation::GET ||
            operation == BenchOperation::BATCH_GET) {
            PrefillKeys(key_id, batch_size, value_size, num_keys);
        }

        // 同步所有线程开始
        barrier->arrive_and_wait();

        // 主测试循环
        while (running_.load()) {
            switch (operation) {
                case BenchOperation::PUT:
                    keys = GeneratePutKeys(key_id, 1);
                    if (Put(keys[0], slice_lengths[0], config)) {
                        gCompletedOperations.fetch_add(1);
                    }
                    break;

                case BenchOperation::BATCH_PUT:
                    keys = GeneratePutKeys(key_id, batch_size);
                    gCompletedOperations.fetch_add(
                        BatchPut(keys, slice_lengths, config));
                    break;

                case BenchOperation::GET:
                    keys = GenerateGetKeys(key_id, 1);
                    if (Get(keys[0])) {
                        gCompletedOperations.fetch_add(1);
                    }
                    break;

                case BenchOperation::BATCH_GET:
                    keys = GenerateGetKeys(key_id, batch_size);
                    gCompletedOperations.fetch_add(BatchGet(keys));
                    break;
            }
        }
    }
};

运行 Master Benchmark:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
./master_bench \
    --master_server=127.0.0.1:50051 \
    --num_segments=128 \
    --segment_size=68719476736 \
    --num_clients=4 \
    --num_threads=4 \
    --operation=BatchPut \
    --batch_size=128 \
    --value_size=4096 \
    --duration=60

11.3.2 性能测试结果分析#

基于 NVIDIA A10 的实测结果:

配置 Backend Output Throughput Mean TTFT P99 TTFT Mean ITL P99 ITL
2P2D tp=1 Redis 12.06 tok/s 844.28ms 2270.91ms 16.88ms 104.83ms
2P2D tp=1 MooncakeStore (TCP) 12.07 tok/s 817.43ms 1969.89ms 12.49ms 45.52ms
2P2D tp=1 MooncakeStore (RDMA) 12.08 tok/s 763.58ms 2030.34ms 12.43ms 43.02ms
2P2D tp=2 Redis 12.13 tok/s 397.20ms 782.44ms 9.00ms 36.06ms
2P2D tp=2 MooncakeStore (RDMA) 12.15 tok/s 271.25ms 532.00ms 8.34ms 14.70ms
xychart-beta
    title "TTFT 延迟对比 (2P2D tp=2)"
    x-axis ["Redis", "MooncakeStore TCP", "MooncakeStore RDMA"]
    y-axis "Mean TTFT (ms)" 0 --> 500
    bar [397.20, 327.91, 271.25]

11.3.3 P/D 分离性能优势#

SGLang 集成测试结果(Traffic Rate = 4.0):

配置 Output Throughput Mean ITL P99 ITL
1P1D 1215.17 tok/s 17.06ms 19.72ms
2 Regular 1223.03 tok/s 25.74ms 294.89ms

关键发现

  • P/D 分离配置下 ITL 降低约 30%
  • P99 尾延迟显著改善(19.72ms vs 294.89ms)
  • 在保持相似吞吐量的同时大幅降低延迟抖动

11.4 性能调优指南#

11.4.1 关键环境变量#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Slice 粒度控制
export MC_SLICE_SIZE=65536  # 默认 64KB

# 传输引擎参数
export MC_NUM_CQ_POLL_THREADS=2
export MC_MAX_BATCH_SIZE=128
export MC_ASYNC_TRANSFER_LIMIT=1000

# RDMA 调优
export MC_RDMA_MAX_WR_DEPTH=128
export MC_RDMA_MAX_SGE=1
export MC_RDMA_QP_COUNT=4

11.4.2 Slice 大小调优#

graph LR
    subgraph "小 Slice (4KB-16KB)"
        S1[更好的多 NIC 负载均衡]
        S2[更高的 CPU 开销]
        S3[适合小对象传输]
    end

    subgraph "大 Slice (256KB-1MB)"
        L1[更低的 CPU 开销]
        L2[单 NIC 可能饱和]
        L3[适合大批量传输]
    end

    S1 --> Decision{工作负载特征}
    L1 --> Decision
    Decision -->|小对象多| Small[选择小 Slice]
    Decision -->|大对象少| Large[选择大 Slice]

推荐配置:

场景 Slice 大小 理由
KVCache 传输 64KB 平衡延迟和吞吐
模型权重加载 256KB-1MB 最大化带宽利用
小批量推理 16KB-32KB 降低尾延迟

11.4.3 线程数调优#

1
2
3
4
5
// 推荐:前端线程数 = NUMA 节点数 × 2-4
int optimal_threads = numa_num_configured_nodes() * 3;

// 每线程处理能力约 10-15 GB/s
// 总带宽需求 / 单线程能力 = 最小线程数

线程数与带宽关系(实测):

线程数 带宽利用率 CPU 开销
1 ~20%
4 ~60%
8 ~85% 中高
12 ~95%

11.4.4 批处理大小调优#

graph TB
    subgraph "Batch Size 影响"
        B1[Batch Size = 1]
        B1 --> L1[最低延迟]
        B1 --> T1[较低吞吐]

        B128[Batch Size = 128]
        B128 --> L128[较高延迟]
        B128 --> T128[最高吞吐]
    end

    subgraph "选择建议"
        Decision{SLO 要求?}
        Decision -->|延迟敏感| Small_B[1-16]
        Decision -->|吞吐优先| Large_B[64-128]
    end

第十二章:监控与可观测性#

12.1 指标体系设计#

12.1.1 客户端指标#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// mooncake-store/include/client_metric.h

// 延迟分桶配置(微秒)
// 针对 RDMA 优化:亚毫秒到秒级别的精细粒度
const std::vector<double> kLatencyBucket = {
    // 亚毫秒到 1ms 区间
    125, 150, 200, 250, 300, 400, 500, 750, 1000,
    // 毫秒级尾部
    1500, 2000, 3000, 5000, 7000, 15000, 20000,
    // 长尾保护
    50000, 100000, 200000, 500000, 1000000
};

struct TransferMetric {
    TransferMetric(std::map<std::string, std::string> labels = {})
        : total_read_bytes("mooncake_transfer_read_bytes",
                          "Total bytes read", labels),
          total_write_bytes("mooncake_transfer_write_bytes",
                           "Total bytes written", labels),
          batch_put_latency_us("mooncake_transfer_batch_put_latency",
                              "Batch Put transfer latency (us)",
                              kLatencyBucket, labels),
          batch_get_latency_us("mooncake_transfer_batch_get_latency",
                              "Batch Get transfer latency (us)",
                              kLatencyBucket, labels),
          get_latency_us("mooncake_transfer_get_latency",
                        "Get transfer latency (us)",
                        kLatencyBucket, labels),
          put_latency_us("mooncake_transfer_put_latency",
                        "Put transfer latency (us)",
                        kLatencyBucket, labels) {}

    ylt::metric::counter_t total_read_bytes;
    ylt::metric::counter_t total_write_bytes;
    ylt::metric::histogram_t batch_put_latency_us;
    ylt::metric::histogram_t batch_get_latency_us;
    ylt::metric::histogram_t get_latency_us;
    ylt::metric::histogram_t put_latency_us;
};

struct MasterClientMetric {
    std::array<std::string, 1> rpc_names = {"rpc_name"};

    MasterClientMetric(std::map<std::string, std::string> labels = {})
        : rpc_count("mooncake_client_rpc_count",
                   "Total number of RPC calls", labels, rpc_names),
          rpc_latency("mooncake_client_rpc_latency",
                     "Latency of RPC calls (us)",
                     kLatencyBucket, labels, rpc_names) {}

    ylt::metric::hybrid_counter_1t rpc_count;
    ylt::metric::hybrid_histogram_1t rpc_latency;
};

12.1.2 服务端指标#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// mooncake-store/include/master_metric_manager.h

class MasterMetricManager {
public:
    static MasterMetricManager& instance();

    // ===== 内存存储指标 =====
    void inc_allocated_mem_size(const std::string& segment, int64_t val = 1);
    void dec_allocated_mem_size(const std::string& segment, int64_t val = 1);
    void inc_total_mem_capacity(const std::string& segment, int64_t val = 1);
    void dec_total_mem_capacity(const std::string& segment, int64_t val = 1);
    double get_global_mem_used_ratio();
    double get_segment_mem_used_ratio(const std::string& segment);

    // ===== 缓存命中统计 =====
    void inc_mem_cache_hit_nums(int64_t val = 1);
    void inc_file_cache_hit_nums(int64_t val = 1);
    void inc_mem_cache_nums(int64_t val = 1);
    void inc_file_cache_nums(int64_t val = 1);
    void inc_valid_get_nums(int64_t val = 1);
    void inc_total_get_nums(int64_t val = 1);

    enum class CacheHitStat {
        MEMORY_HITS,
        SSD_HITS,
        MEMORY_TOTAL,
        SSD_TOTAL,
        MEMORY_HIT_RATE,
        SSD_HIT_RATE,
        OVERALL_HIT_RATE,
        VALID_GET_RATE
    };
    CacheHitStatDict calculate_cache_stats();

    // ===== 操作统计 =====
    void inc_put_start_requests(int64_t val = 1);
    void inc_put_start_failures(int64_t val = 1);
    void inc_get_replica_list_requests(int64_t val = 1);
    void inc_get_replica_list_failures(int64_t val = 1);
    // ... 更多操作指标

    // ===== 批量操作统计 =====
    void inc_batch_put_start_requests(int64_t items);
    void inc_batch_put_start_failures(int64_t failed_items);
    void inc_batch_put_start_partial_success(int64_t failed_items);

    // ===== 驱逐指标 =====
    void inc_eviction_success(int64_t key_count, int64_t size);
    void inc_eviction_fail();

    // ===== 序列化 =====
    std::string serialize_metrics();  // Prometheus 格式
    std::string get_summary_string(); // 人类可读格式

private:
    // 内存指标
    ylt::metric::gauge_t mem_allocated_size_;
    ylt::metric::gauge_t mem_total_capacity_;
    ylt::metric::dynamic_gauge_1t mem_allocated_size_per_segment_;
    ylt::metric::dynamic_gauge_1t mem_total_capacity_per_segment_;

    // 文件存储指标
    ylt::metric::gauge_t file_allocated_size_;
    ylt::metric::gauge_t file_total_capacity_;

    // 缓存命中指标
    ylt::metric::counter_t mem_cache_hit_nums_;
    ylt::metric::counter_t file_cache_hit_nums_;
    ylt::metric::gauge_t mem_cache_nums_;
    ylt::metric::gauge_t file_cache_nums_;

    // 操作计数器
    ylt::metric::counter_t put_start_requests_;
    ylt::metric::counter_t put_start_failures_;
    ylt::metric::counter_t get_replica_list_requests_;
    ylt::metric::counter_t get_replica_list_failures_;
    // ...
};

12.1.3 指标体系全景#

graph TB
    subgraph "Client Metrics"
        CM1[transfer_read_bytes]
        CM2[transfer_write_bytes]
        CM3[get_latency_us]
        CM4[put_latency_us]
        CM5[rpc_count]
        CM6[rpc_latency]
    end

    subgraph "Master Metrics"
        MM1[mem_allocated_size]
        MM2[mem_total_capacity]
        MM3[mem_cache_hit_nums]
        MM4[file_cache_hit_nums]
        MM5[eviction_success]
        MM6[put_start_requests]
        MM7[get_replica_list_requests]
    end

    subgraph "Derived Metrics"
        DM1[memory_hit_rate]
        DM2[ssd_hit_rate]
        DM3[overall_hit_rate]
        DM4[mem_used_ratio]
        DM5[valid_get_rate]
    end

    MM3 --> DM1
    MM4 --> DM2
    MM3 & MM4 --> DM3
    MM1 & MM2 --> DM4

    CM1 & CM2 --> Prometheus[Prometheus]
    MM1 & MM2 & MM3 --> Prometheus
    DM1 & DM2 & DM3 --> Grafana[Grafana Dashboard]

12.2 指标收集实现#

12.2.1 客户端指标收集#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 客户端指标创建
std::unique_ptr<ClientMetric> ClientMetric::Create(
    std::map<std::string, std::string> labels) {

    // 检查是否启用指标收集
    const char* env_enabled = getenv("MC_STORE_CLIENT_METRIC");
    if (env_enabled && (strcmp(env_enabled, "0") == 0 ||
                        strcmp(env_enabled, "false") == 0)) {
        return nullptr;
    }

    // 获取报告间隔
    uint64_t interval = 0;
    const char* env_interval = getenv("MC_STORE_CLIENT_METRIC_INTERVAL");
    if (env_interval) {
        interval = std::stoull(env_interval);
    }

    // 合并静态标签
    auto merged_labels = merge_labels(labels);

    return std::make_unique<ClientMetric>(interval, merged_labels);
}

// 使用示例
void RealClient::Get(const std::string& key, ...) {
    auto start_time = std::chrono::high_resolution_clock::now();

    // ... 执行 Get 操作

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration_us = std::chrono::duration_cast<
        std::chrono::microseconds>(end_time - start_time).count();

    if (metric_) {
        metric_->transfer_metric.get_latency_us.observe(duration_us);
        metric_->transfer_metric.total_read_bytes.inc(total_bytes);
    }
}

12.2.2 Prometheus 指标格式#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::string MasterMetricManager::serialize_metrics() {
    std::string str;

    // 内存指标
    mem_allocated_size_.serialize(str);
    mem_total_capacity_.serialize(str);
    mem_allocated_size_per_segment_.serialize(str);
    mem_total_capacity_per_segment_.serialize(str);

    // 文件存储指标
    file_allocated_size_.serialize(str);
    file_total_capacity_.serialize(str);

    // 缓存命中指标
    mem_cache_hit_nums_.serialize(str);
    file_cache_hit_nums_.serialize(str);

    // 操作统计
    put_start_requests_.serialize(str);
    put_start_failures_.serialize(str);
    // ...

    return str;
}

输出示例:

# HELP mooncake_master_mem_allocated_size Currently allocated memory size
# TYPE mooncake_master_mem_allocated_size gauge
mooncake_master_mem_allocated_size{cluster_id="prod-1"} 1073741824

# HELP mooncake_master_mem_cache_hit_nums Memory cache hit count
# TYPE mooncake_master_mem_cache_hit_nums counter
mooncake_master_mem_cache_hit_nums{cluster_id="prod-1"} 12345678

# HELP mooncake_transfer_get_latency Get transfer latency in microseconds
# TYPE mooncake_transfer_get_latency histogram
mooncake_transfer_get_latency_bucket{le="125"} 1000
mooncake_transfer_get_latency_bucket{le="250"} 5000
mooncake_transfer_get_latency_bucket{le="500"} 9500
mooncake_transfer_get_latency_bucket{le="1000"} 9900
mooncake_transfer_get_latency_bucket{le="+Inf"} 10000
mooncake_transfer_get_latency_sum 2500000
mooncake_transfer_get_latency_count 10000

12.2.3 人类可读摘要#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
std::string TransferMetric::summary_metrics() {
    std::stringstream ss;
    ss << "=== Transfer Metrics Summary ===\n";

    // 传输字节数
    auto read_bytes = total_read_bytes.value();
    auto write_bytes = total_write_bytes.value();
    ss << "Total Read: " << byte_size_to_string(read_bytes) << "\n";
    ss << "Total Write: " << byte_size_to_string(write_bytes) << "\n";

    // 延迟摘要
    ss << "\n=== Latency Summary (microseconds) ===\n";
    ss << "Get: " << format_latency_summary(get_latency_us) << "\n";
    ss << "Put: " << format_latency_summary(put_latency_us) << "\n";
    ss << "Batch Get: " << format_latency_summary(batch_get_latency_us) << "\n";
    ss << "Batch Put: " << format_latency_summary(batch_put_latency_us) << "\n";

    return ss.str();
}

输出示例:

=== Transfer Metrics Summary ===
Total Read: 1.23 TB
Total Write: 456.78 GB

=== Latency Summary (microseconds) ===
Get: count=1000000, p95<500μs, max<2000μs
Put: count=500000, p95<750μs, max<3000μs
Batch Get: count=50000, p95<1500μs, max<5000μs
Batch Put: count=25000, p95<2000μs, max<7000μs

12.3 可视化与告警#

12.3.1 Grafana Dashboard 配置#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
  "dashboard": {
    "title": "Mooncake Store Metrics",
    "panels": [
      {
        "title": "Memory Utilization",
        "type": "gauge",
        "targets": [
          {
            "expr": "mooncake_master_mem_allocated_size / mooncake_master_mem_total_capacity * 100"
          }
        ],
        "thresholds": [
          { "value": 70, "color": "green" },
          { "value": 85, "color": "yellow" },
          { "value": 95, "color": "red" }
        ]
      },
      {
        "title": "Cache Hit Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(mooncake_master_mem_cache_hit_nums[5m]) / rate(mooncake_master_total_get_nums[5m]) * 100",
            "legendFormat": "Memory Hit Rate"
          },
          {
            "expr": "rate(mooncake_master_file_cache_hit_nums[5m]) / rate(mooncake_master_total_get_nums[5m]) * 100",
            "legendFormat": "SSD Hit Rate"
          }
        ]
      },
      {
        "title": "Transfer Latency (P99)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.99, rate(mooncake_transfer_get_latency_bucket[5m]))",
            "legendFormat": "Get P99"
          }
        ]
      }
    ]
  }
}

12.3.2 告警规则#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# Prometheus AlertManager 规则
groups:
  - name: mooncake_alerts
    rules:
      # 内存使用率告警
      - alert: HighMemoryUsage
        expr: |
          mooncake_master_mem_allocated_size /
          mooncake_master_mem_total_capacity > 0.9          
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage detected"
          description: "Memory usage is above 90% for 5 minutes"

      # 缓存命中率告警
      - alert: LowCacheHitRate
        expr: |
          rate(mooncake_master_mem_cache_hit_nums[5m]) /
          rate(mooncake_master_total_get_nums[5m]) < 0.5          
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Low cache hit rate"
          description: "Memory cache hit rate below 50%"

      # 高延迟告警
      - alert: HighTransferLatency
        expr: |
          histogram_quantile(0.99,
            rate(mooncake_transfer_get_latency_bucket[5m])) > 10000          
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High transfer latency"
          description: "P99 latency exceeds 10ms"

      # 驱逐失败告警
      - alert: EvictionFailures
        expr: |
          rate(mooncake_master_eviction_attempts[5m]) -
          rate(mooncake_master_eviction_success[5m]) > 10          
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High eviction failure rate"

12.3.3 监控架构#

graph TB
    subgraph "Mooncake Cluster"
        M1[Master 1]
        M2[Master 2]
        C1[Client 1]
        C2[Client 2]
    end

    subgraph "Metrics Collection"
        M1 -->|/metrics| Prometheus
        M2 -->|/metrics| Prometheus
        C1 -->|Push| Prometheus
        C2 -->|Push| Prometheus
    end

    subgraph "Visualization & Alerting"
        Prometheus --> Grafana[Grafana]
        Prometheus --> AlertManager
        AlertManager --> PagerDuty
        AlertManager --> Slack
    end

    style Prometheus fill:#E6522C
    style Grafana fill:#F46800

第十三章:生产环境部署指南#

13.1 部署架构#

13.1.1 单集群部署#

graph TB
    subgraph "Control Plane"
        etcd1[etcd Node 1]
        etcd2[etcd Node 2]
        etcd3[etcd Node 3]
        Master[Master Service<br/>Active]
        MasterStandby[Master Service<br/>Standby]
    end

    subgraph "Data Plane - Prefill Nodes"
        P1[Prefill Node 1<br/>GPU + RDMA]
        P2[Prefill Node 2<br/>GPU + RDMA]
        P3[Prefill Node 3<br/>GPU + RDMA]
    end

    subgraph "Data Plane - Decode Nodes"
        D1[Decode Node 1<br/>GPU + RDMA]
        D2[Decode Node 2<br/>GPU + RDMA]
        D3[Decode Node 3<br/>GPU + RDMA]
    end

    subgraph "Storage Layer"
        SSD1[(NVMe SSD Pool)]
    end

    etcd1 <--> etcd2 <--> etcd3
    Master --> etcd1
    MasterStandby -.-> etcd1

    P1 & P2 & P3 <-->|RDMA| D1 & D2 & D3
    P1 & P2 & P3 --> Master
    D1 & D2 & D3 --> Master
    D1 & D2 & D3 --> SSD1

    Master <-.->|Failover| MasterStandby

13.1.2 多集群联邦部署#

graph TB
    subgraph "Cluster A - Region 1"
        MA[Master A]
        PA1[Prefill A1]
        PA2[Prefill A2]
        DA1[Decode A1]
    end

    subgraph "Cluster B - Region 2"
        MB[Master B]
        PB1[Prefill B1]
        DB1[Decode B1]
        DB2[Decode B2]
    end

    subgraph "Global Load Balancer"
        GLB[Global LB]
    end

    GLB --> MA
    GLB --> MB

    PA1 & PA2 <-->|Cross-Region RDMA| PB1
    DA1 <-->|Cross-Region RDMA| DB1 & DB2

13.2 硬件配置建议#

13.2.1 网络配置#

组件 推荐配置 最低配置
RDMA NIC Mellanox ConnectX-7 200Gbps ConnectX-6 100Gbps
交换机 InfiniBand HDR 200Gbps RoCE 100Gbps
网络拓扑 Fat-tree, 全对分带宽 3:1 收敛比
MTU 4096+ (RDMA) 1500 (TCP fallback)

13.2.2 存储配置#

graph TB
    subgraph "推荐存储层次"
        L1[GPU HBM<br/>~80GB, ~3TB/s]
        L2[Host DRAM<br/>256GB-1TB, ~200GB/s]
        L3[NVMe SSD<br/>2-8TB, ~7GB/s]
    end

    L1 -->|溢出| L2
    L2 -->|溢出| L3

    style L1 fill:#FFD700
    style L2 fill:#90EE90
    style L3 fill:#87CEEB

13.2.3 NUMA 配置#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 检查 NUMA 拓扑
numactl --hardware

# 推荐配置:每个 NUMA 节点运行独立的服务实例
# Node 0: Master Service + Client for GPU 0-3
# Node 1: Client for GPU 4-7

# 绑定进程到特定 NUMA 节点
numactl --cpunodebind=0 --membind=0 ./mooncake_master
numactl --cpunodebind=1 --membind=1 ./client_instance

13.3 配置文件示例#

13.3.1 Master Service 配置#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# master_config.yaml
server:
  address: "0.0.0.0:50051"
  max_connections: 1000
  request_timeout_ms: 30000

metadata:
  type: "etcd"
  endpoints:
    - "etcd1:2379"
    - "etcd2:2379"
    - "etcd3:2379"
  ttl_seconds: 30

storage:
  allocator_type: "offset" # "cachelib" or "offset"
  eviction_policy: "lru"
  eviction_threshold: 0.85
  eviction_target: 0.75

high_availability:
  enabled: true
  lease_ttl_seconds: 10
  anti_split_brain_wait_ms: 15000

metrics:
  enabled: true
  prometheus_port: 9090
  report_interval_seconds: 10

13.3.2 Transfer Engine 配置#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# transfer_engine_config.yaml
segment:
  name: "node001"
  type: "DRAM" # "DRAM" or "VRAM"
  size_bytes: 68719476736 # 64GB

transport:
  type: "rdma"
  devices:
    - "mlx5_0"
    - "mlx5_1"
  max_qp_depth: 128
  max_sge: 1
  relaxed_ordering: true

metadata:
  type: "etcd"
  endpoints:
    - "etcd1:2379"
    - "etcd2:2379"

slice:
  size_bytes: 65536 # 64KB

async:
  max_pending_transfers: 1000
  completion_poll_threads: 2

13.3.3 客户端配置#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# client_config.yaml
master:
  endpoints:
    - "master1:50051"
    - "master2:50051"
  connect_timeout_ms: 5000
  request_timeout_ms: 10000

transfer:
  batch_size: 128
  prefetch_enabled: true
  prefetch_threshold: 0.7

metrics:
  enabled: true
  cluster_id: "prod-cluster-1"
  report_interval_seconds: 30

retry:
  max_attempts: 3
  initial_backoff_ms: 100
  max_backoff_ms: 5000

13.4 运维操作手册#

13.4.1 健康检查#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
# health_check.sh

# 检查 Master 服务
check_master() {
    response=$(curl -s -o /dev/null -w "%{http_code}" \
        http://localhost:9090/health)
    if [ "$response" != "200" ]; then
        echo "ERROR: Master service unhealthy"
        return 1
    fi
    echo "OK: Master service healthy"
}

# 检查 etcd 集群
check_etcd() {
    etcdctl endpoint health --endpoints=etcd1:2379,etcd2:2379,etcd3:2379
}

# 检查 RDMA 连接
check_rdma() {
    ibstat | grep -E "State:|Physical state:"
    if ! ibstat | grep -q "Active"; then
        echo "WARNING: RDMA port not active"
        return 1
    fi
}

# 检查内存使用
check_memory() {
    usage=$(curl -s http://localhost:9090/metrics | \
        grep "mooncake_master_mem_allocated_size" | \
        awk '{print $2}')
    capacity=$(curl -s http://localhost:9090/metrics | \
        grep "mooncake_master_mem_total_capacity" | \
        awk '{print $2}')
    ratio=$(echo "scale=2; $usage / $capacity" | bc)
    echo "Memory usage: $ratio"
    if (( $(echo "$ratio > 0.95" | bc -l) )); then
        echo "WARNING: Memory usage above 95%"
        return 1
    fi
}

# 运行所有检查
check_master && check_etcd && check_rdma && check_memory

13.4.2 故障恢复#

flowchart TD
    Start[检测到故障] --> Type{故障类型?}

    Type -->|Master 宕机| M1[检查 etcd Leader 选举]
    M1 --> M2{新 Leader 选出?}
    M2 -->|是| M3[验证服务恢复]
    M2 -->|否| M4[手动干预:<br/>检查 etcd 集群状态]

    Type -->|RDMA 链路故障| R1[检查物理连接]
    R1 --> R2[重启 RDMA 端口]
    R2 --> R3[验证连接恢复]
    R3 -->|失败| R4[切换到 TCP Fallback]

    Type -->|内存耗尽| E1[触发紧急驱逐]
    E1 --> E2[检查驱逐是否成功]
    E2 -->|失败| E3[扩容或手动清理]

    Type -->|节点宕机| N1[检查 Segment 状态]
    N1 --> N2[标记 Segment 不可用]
    N2 --> N3[重新路由请求]
    N3 --> N4[节点恢复后 ReMountSegment]

13.4.3 扩容操作#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/bin/bash
# scale_out.sh - 添加新节点

NEW_NODE=$1
MASTER_ADDR=$2

# 1. 验证新节点 RDMA 连通性
echo "Testing RDMA connectivity..."
ibping -S -C mlx5_0 &
sleep 2
ssh $NEW_NODE "ibping -c 1000 -C mlx5_0 $(hostname)"

# 2. 在新节点上启动服务
echo "Starting services on new node..."
ssh $NEW_NODE << 'EOF'
    # 设置环境变量
    export MC_MASTER_ADDR=$MASTER_ADDR
    export MC_SEGMENT_SIZE=68719476736

    # 启动 Transfer Engine
    ./mooncake_client --config=/etc/mooncake/client.yaml &

    # 等待服务就绪
    sleep 5

    # 挂载 Segment
    ./mc_admin mount_segment \
        --name="$(hostname)" \
        --size=$MC_SEGMENT_SIZE \
        --master=$MC_MASTER_ADDR
EOF

# 3. 验证新节点已注册
echo "Verifying node registration..."
./mc_admin list_segments --master=$MASTER_ADDR | grep $NEW_NODE

echo "Scale-out complete!"

13.4.4 滚动升级#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/bin/bash
# rolling_upgrade.sh

NODES="node1 node2 node3 node4"
NEW_VERSION=$1

for node in $NODES; do
    echo "Upgrading $node to $NEW_VERSION..."

    # 1. 标记节点进入维护模式
    ./mc_admin set_node_status $node MAINTENANCE

    # 2. 等待现有请求完成
    echo "Draining connections..."
    sleep 30

    # 3. 停止服务
    ssh $node "systemctl stop mooncake"

    # 4. 更新二进制
    scp mooncake-$NEW_VERSION.tar.gz $node:/tmp/
    ssh $node << EOF
        cd /opt/mooncake
        tar -xzf /tmp/mooncake-$NEW_VERSION.tar.gz
        ln -sf mooncake-$NEW_VERSION current
EOF

    # 5. 启动服务
    ssh $node "systemctl start mooncake"

    # 6. 健康检查
    echo "Waiting for health check..."
    for i in {1..30}; do
        if ssh $node "curl -s localhost:9090/health" | grep -q "ok"; then
            echo "$node healthy"
            break
        fi
        sleep 2
    done

    # 7. 恢复节点
    ./mc_admin set_node_status $node ACTIVE

    echo "$node upgraded successfully"
    sleep 10  # 冷却时间
done

echo "Rolling upgrade complete!"

13.5 本章小结#

本章涵盖了 Mooncake 生产环境部署的关键方面:

  1. 部署架构:单集群和多集群联邦部署模式
  2. 硬件配置:网络、存储和 NUMA 的最佳实践
  3. 配置管理:Master、Transfer Engine 和客户端配置
  4. 运维操作:健康检查、故障恢复、扩容和升级流程

这些指南基于实际生产环境经验,帮助运维团队快速部署和管理 Mooncake 集群。


第六部分:代码解析精选#


第十四章:Master Service 核心代码深度解析#

本章将深入分析 Master Service 的核心代码实现,从实际源码角度理解其设计哲学和关键技术细节。

14.1 MasterService 类结构分析#

14.1.1 类定义与成员变量#

MasterService 是整个 Mooncake Store 的核心控制类,管理着元数据、分段和副本。让我们从其构造函数入手:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 文件: mooncake-store/src/master_service.cpp

MasterService::MasterService(const MasterServiceConfig& config)
    : default_kv_lease_ttl_(config.default_kv_lease_ttl),
      default_kv_soft_pin_ttl_(config.default_kv_soft_pin_ttl),
      allow_evict_soft_pinned_objects_(config.allow_evict_soft_pinned_objects),
      eviction_ratio_(config.eviction_ratio),
      eviction_high_watermark_ratio_(config.eviction_high_watermark_ratio),
      client_live_ttl_sec_(config.client_live_ttl_sec),
      enable_ha_(config.enable_ha),
      enable_offload_(config.enable_offload),
      cluster_id_(config.cluster_id),
      root_fs_dir_(config.root_fs_dir),
      global_file_segment_size_(config.global_file_segment_size),
      enable_disk_eviction_(config.enable_disk_eviction),
      quota_bytes_(config.quota_bytes),
      segment_manager_(config.memory_allocator),
      memory_allocator_type_(config.memory_allocator),
      allocation_strategy_(std::make_shared<RandomAllocationStrategy>()),
      put_start_discard_timeout_sec_(config.put_start_discard_timeout_sec),
      put_start_release_timeout_sec_(config.put_start_release_timeout_sec) {

    // 参数验证
    if (eviction_ratio_ < 0.0 || eviction_ratio_ > 1.0) {
        LOG(ERROR) << "Eviction ratio must be between 0.0 and 1.0";
        throw std::invalid_argument("Invalid eviction ratio");
    }

    // 启动后台线程
    eviction_running_ = true;
    eviction_thread_ = std::thread(&MasterService::EvictionThreadFunc, this);

    client_monitor_running_ = true;
    client_monitor_thread_ =
        std::thread(&MasterService::ClientMonitorFunc, this);
}

设计要点分析

  1. 配置驱动:通过 MasterServiceConfig 结构体集中管理所有配置参数
  2. 后台线程:自动启动两个关键后台线程
    • eviction_thread_:负责内存驱逐
    • client_monitor_thread_:负责客户端健康监控
  3. 资源管理:使用 RAII 模式,在构造时初始化,在析构时清理

14.1.2 元数据分片架构#

MasterService 采用分片 (Sharding) 架构来减少锁竞争:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 元数据分片定义
static constexpr size_t kNumShards = 256;

struct MetadataShard {
    mutable Mutex mutex;
    std::unordered_map<std::string, ObjectMetadata> metadata;
    std::unordered_set<std::string> processing_keys;
};

std::array<MetadataShard, kNumShards> metadata_shards_;

// 分片索引计算
size_t getShardIndex(const std::string& key) const {
    return std::hash<std::string>{}(key) % kNumShards;
}

分片策略分析

graph TB
    subgraph "Key Space"
        K1[Key: "model/layer1"]
        K2[Key: "model/layer2"]
        K3[Key: "cache/token1"]
        K4[Key: "cache/token2"]
    end

    subgraph "Hash Function"
        H[std::hash]
    end

    subgraph "256 Shards"
        S0[Shard 0]
        S1[Shard 1]
        S127[Shard 127]
        S255[Shard 255]
    end

    K1 --> H
    K2 --> H
    K3 --> H
    K4 --> H

    H -->|hash % 256 = 42| S0
    H -->|hash % 256 = 128| S127
    H -->|hash % 256 = 7| S1
    H -->|hash % 256 = 200| S255

    style H fill:#FFD700

分片的好处:

  • 减少锁粒度,提高并发性能
  • 256 个分片在大多数场景下提供足够的并行度
  • 使用标准库 std::hash 保证良好的分布

14.1.3 MetadataAccessor 辅助类#

为了简化元数据访问并确保线程安全,代码使用了 Accessor 模式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class MetadataAccessor {
public:
    MetadataAccessor(MasterService* service, const std::string& key)
        : service_(service),
          shard_idx_(service->getShardIndex(key)),
          key_(key) {
        // 获取分片锁
        service_->metadata_shards_[shard_idx_].mutex.lock();
    }

    ~MetadataAccessor() {
        // 释放分片锁
        service_->metadata_shards_[shard_idx_].mutex.unlock();
    }

    bool Exists() const {
        return service_->metadata_shards_[shard_idx_]
            .metadata.count(key_) > 0;
    }

    ObjectMetadata& Get() {
        return service_->metadata_shards_[shard_idx_].metadata.at(key_);
    }

    void Erase() {
        service_->metadata_shards_[shard_idx_].metadata.erase(key_);
    }

    bool InProcessing() const {
        return service_->metadata_shards_[shard_idx_]
            .processing_keys.count(key_) > 0;
    }

    void EraseFromProcessing() {
        service_->metadata_shards_[shard_idx_]
            .processing_keys.erase(key_);
    }

private:
    MasterService* service_;
    size_t shard_idx_;
    std::string key_;
};

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
auto MasterService::GetReplicaList(std::string_view key)
    -> tl::expected<GetReplicaListResponse, ErrorCode> {
    // RAII 自动管理锁的获取和释放
    MetadataAccessor accessor(this, std::string(key));

    MasterMetricManager::instance().inc_total_get_nums();

    if (!accessor.Exists()) {
        return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
    }

    auto& metadata = accessor.Get();
    // ... 处理逻辑
}

14.2 PutStart/PutEnd 事务流程#

14.2.1 PutStart:两阶段提交的第一阶段#

PutStart 是写入操作的核心入口,实现了类似两阶段提交的第一阶段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
auto MasterService::PutStart(const UUID& client_id, const std::string& key,
                             const uint64_t slice_length,
                             const ReplicateConfig& config)
    -> tl::expected<std::vector<Replica::Descriptor>, ErrorCode> {

    // 1. 参数验证
    if (config.replica_num == 0 || key.empty() || slice_length == 0) {
        return tl::make_unexpected(ErrorCode::INVALID_PARAMS);
    }

    // 2. 锁定分片
    size_t shard_idx = getShardIndex(key);
    MutexLocker lock(&metadata_shards_[shard_idx].mutex);

    const auto now = std::chrono::steady_clock::now();
    auto it = metadata_shards_[shard_idx].metadata.find(key);

    // 3. 检查对象是否已存在
    if (it != metadata_shards_[shard_idx].metadata.end() &&
        !CleanupStaleHandles(it->second)) {
        auto& metadata = it->second;

        // 处理过期的未完成写入
        if (!metadata.HasCompletedReplicas() &&
            metadata.put_start_time + put_start_discard_timeout_sec_ < now) {
            // 丢弃过期的处理中副本
            auto replicas = metadata.DiscardProcessingReplicas();
            if (!replicas.empty()) {
                std::lock_guard lock(discarded_replicas_mutex_);
                discarded_replicas_.emplace_back(
                    std::move(replicas),
                    metadata.put_start_time + put_start_release_timeout_sec_);
            }
            metadata_shards_[shard_idx].processing_keys.erase(key);
            metadata_shards_[shard_idx].metadata.erase(it);
        } else {
            return tl::make_unexpected(ErrorCode::OBJECT_ALREADY_EXISTS);
        }
    }

    // 4. 分配副本空间
    std::vector<Replica> replicas;
    {
        ScopedAllocatorAccess allocator_access =
            segment_manager_.getAllocatorAccess();
        const auto& allocator_manager = allocator_access.getAllocatorManager();

        std::vector<std::string> preferred_segments;
        if (!config.preferred_segment.empty()) {
            preferred_segments.push_back(config.preferred_segment);
        }

        // 使用分配策略分配副本
        auto allocation_result = allocation_strategy_->Allocate(
            allocator_manager, slice_length, config.replica_num,
            preferred_segments);

        if (!allocation_result.has_value()) {
            need_eviction_ = true;  // 触发驱逐
            return tl::make_unexpected(ErrorCode::NO_AVAILABLE_HANDLE);
        }

        replicas = std::move(allocation_result.value());
    }

    // 5. 创建元数据记录
    std::vector<Replica::Descriptor> replica_list;
    replica_list.reserve(replicas.size());
    for (const auto& replica : replicas) {
        replica_list.emplace_back(replica.get_descriptor());
    }

    metadata_shards_[shard_idx].metadata.emplace(
        std::piecewise_construct, std::forward_as_tuple(key),
        std::forward_as_tuple(client_id, now, total_length,
                              std::move(replicas), config.with_soft_pin));

    // 6. 加入处理中集合用于监控
    metadata_shards_[shard_idx].processing_keys.insert(key);

    return replica_list;
}

流程图解

sequenceDiagram
    participant Client
    participant Master
    participant SegmentManager
    participant Allocator

    Client->>Master: PutStart(key, size, config)

    Master->>Master: 获取分片锁
    Master->>Master: 检查 key 是否存在

    alt key 已存在且未过期
        Master-->>Client: OBJECT_ALREADY_EXISTS
    else key 不存在或已过期
        Master->>SegmentManager: 获取 Allocator 访问
        SegmentManager->>Allocator: 分配副本空间

        alt 分配成功
            Allocator-->>SegmentManager: 返回副本列表
            SegmentManager-->>Master: 分配结果
            Master->>Master: 创建元数据记录
            Master->>Master: 加入 processing_keys
            Master-->>Client: 返回副本描述符列表
        else 分配失败
            Allocator-->>SegmentManager: 分配失败
            Master->>Master: 设置 need_eviction_ = true
            Master-->>Client: NO_AVAILABLE_HANDLE
        end
    end

14.2.2 PutEnd:完成事务#

PutEnd 标记写入操作完成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
auto MasterService::PutEnd(const UUID& client_id, const std::string& key,
                           ReplicaType replica_type)
    -> tl::expected<void, ErrorCode> {

    MetadataAccessor accessor(this, key);

    if (!accessor.Exists()) {
        return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
    }

    auto& metadata = accessor.Get();

    // 安全检查:确保是同一个客户端
    if (client_id != metadata.client_id) {
        LOG(ERROR) << "Illegal client " << client_id
                   << " to PutEnd key " << key;
        return tl::make_unexpected(ErrorCode::ILLEGAL_CLIENT);
    }

    // 标记副本完成
    for (auto& replica : metadata.replicas) {
        if (replica.type() == replica_type) {
            replica.mark_complete();
        }
        // 如果启用 offload,推送到卸载队列
        if (enable_offload_) {
            PushOffloadingQueue(key, replica);
        }
    }

    // 从处理中集合移除
    if (metadata.IsAllReplicasComplete() && accessor.InProcessing()) {
        accessor.EraseFromProcessing();
    }

    // 更新指标
    if (replica_type == ReplicaType::MEMORY) {
        MasterMetricManager::instance().inc_mem_cache_nums();
    } else if (replica_type == ReplicaType::DISK) {
        MasterMetricManager::instance().inc_file_cache_nums();
    }

    // 初始化租约
    metadata.GrantLease(0, default_kv_soft_pin_ttl_);

    return {};
}

14.2.3 PutRevoke:事务回滚#

当写入失败时,客户端可以调用 PutRevoke 回滚:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
auto MasterService::PutRevoke(const UUID& client_id, const std::string& key,
                              ReplicaType replica_type)
    -> tl::expected<void, ErrorCode> {

    MetadataAccessor accessor(this, key);

    if (!accessor.Exists()) {
        return tl::make_unexpected(ErrorCode::OBJECT_NOT_FOUND);
    }

    auto& metadata = accessor.Get();

    // 安全验证
    if (client_id != metadata.client_id) {
        return tl::make_unexpected(ErrorCode::ILLEGAL_CLIENT);
    }

    // 验证副本状态必须是 PROCESSING
    if (auto status = metadata.HasDiffRepStatus(ReplicaStatus::PROCESSING,
                                                replica_type)) {
        return tl::make_unexpected(ErrorCode::INVALID_WRITE);
    }

    // 更新指标并删除副本
    if (replica_type == ReplicaType::MEMORY) {
        MasterMetricManager::instance().dec_mem_cache_nums();
    }

    metadata.EraseReplica(replica_type);

    // 从处理中集合移除
    if (metadata.IsAllReplicasComplete() && accessor.InProcessing()) {
        accessor.EraseFromProcessing();
    }

    // 如果没有有效副本,删除整个对象
    if (metadata.IsValid() == false) {
        accessor.Erase();
    }

    return {};
}

14.3 驱逐机制深度解析#

14.3.1 驱逐线程主循环#

驱逐线程持续运行,根据内存使用率触发驱逐:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void MasterService::EvictionThreadFunc() {
    VLOG(1) << "action=eviction_thread_started";

    while (eviction_running_) {
        double used_ratio =
            MasterMetricManager::instance().get_global_mem_used_ratio();

        // 两种触发条件:
        // 1. 使用率超过高水位线
        // 2. 有显式的驱逐请求(need_eviction_)
        if (used_ratio > eviction_high_watermark_ratio_ ||
            (need_eviction_ && eviction_ratio_ > 0.0)) {

            // 计算驱逐目标比例
            double evict_ratio_target = std::max(
                eviction_ratio_,
                used_ratio - eviction_high_watermark_ratio_ + eviction_ratio_);
            double evict_ratio_lowerbound =
                std::max(evict_ratio_target * 0.5,
                         used_ratio - eviction_high_watermark_ratio_);

            BatchEvict(evict_ratio_target, evict_ratio_lowerbound);
        }

        std::this_thread::sleep_for(
            std::chrono::milliseconds(kEvictionThreadSleepMs));
    }
}

14.3.2 批量驱逐算法#

驱逐算法采用两阶段策略,优先驱逐没有软锁定的对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
void MasterService::BatchEvict(double evict_ratio_target,
                               double evict_ratio_lowerbound) {
    auto now = std::chrono::steady_clock::now();
    long evicted_count = 0;
    long object_count = 0;
    uint64_t total_freed_size = 0;

    // 候选对象分类
    std::vector<std::chrono::steady_clock::time_point> no_pin_objects;
    std::vector<std::chrono::steady_clock::time_point> soft_pin_objects;

    // 随机选择起始分片,避免驱逐不均衡
    size_t start_idx = rand() % metadata_shards_.size();

    // ========== 第一阶段:驱逐无软锁定且租约过期的对象 ==========
    for (size_t i = 0; i < metadata_shards_.size(); i++) {
        auto& shard = metadata_shards_[(start_idx + i) % metadata_shards_.size()];
        MutexLocker lock(&shard.mutex);

        // 先处理过期的处理中 key
        DiscardExpiredProcessingKeys(shard, now);

        object_count += shard.metadata.size();

        // 计算此分片理想的驱逐数量
        const long ideal_evict_num =
            std::ceil(object_count * evict_ratio_target) - evicted_count;

        std::vector<std::chrono::steady_clock::time_point> candidates;

        for (auto it = shard.metadata.begin(); it != shard.metadata.end(); it++) {
            // 跳过未过期或有未完成副本的对象
            if (!it->second.IsLeaseExpired(now) ||
                it->second.HasDiffRepStatus(ReplicaStatus::COMPLETE,
                                            ReplicaType::MEMORY)) {
                continue;
            }

            if (!it->second.IsSoftPinned(now)) {
                if (ideal_evict_num > 0) {
                    candidates.push_back(it->second.lease_timeout);
                } else {
                    no_pin_objects.push_back(it->second.lease_timeout);
                }
            } else if (allow_evict_soft_pinned_objects_) {
                soft_pin_objects.push_back(it->second.lease_timeout);
            }
        }

        // 使用 nth_element 找到驱逐阈值(O(n) 复杂度)
        if (ideal_evict_num > 0 && !candidates.empty()) {
            long evict_num = std::min(ideal_evict_num, (long)candidates.size());
            std::nth_element(candidates.begin(),
                             candidates.begin() + (evict_num - 1),
                             candidates.end());
            auto target_timeout = candidates[evict_num - 1];

            // 驱逐租约超时 <= target_timeout 的对象
            auto it = shard.metadata.begin();
            while (it != shard.metadata.end()) {
                if (/* 满足驱逐条件 */
                    it->second.lease_timeout <= target_timeout) {
                    total_freed_size +=
                        it->second.size * it->second.GetMemReplicaCount();
                    it->second.EraseReplica(ReplicaType::MEMORY);
                    if (!it->second.IsValid()) {
                        it = shard.metadata.erase(it);
                    } else {
                        ++it;
                    }
                    evicted_count++;
                } else {
                    ++it;
                }
            }
        }
    }

    // ========== 第二阶段:如果还需要驱逐更多 ==========
    uint64_t released_discarded_cnt = ReleaseExpiredDiscardedReplicas(now);

    long target_evict_num = std::ceil(object_count * evict_ratio_lowerbound) -
                            evicted_count - released_discarded_cnt;

    if (target_evict_num > 0) {
        // 第二阶段驱逐逻辑...
        // 优先驱逐 no_pin_objects,必要时驱逐 soft_pin_objects
    }

    // 更新指标
    if (evicted_count > 0 || released_discarded_cnt > 0) {
        need_eviction_ = false;
        MasterMetricManager::instance().inc_eviction_success(
            evicted_count, total_freed_size);
    }
}

驱逐优先级图解

graph TB
    subgraph "驱逐优先级(从高到低)"
        P1[1. 租约过期 + 无软锁定]
        P2[2. 租约过期 + 有软锁定<br/>(仅当 allow_evict_soft_pinned_objects_=true)]
        P3[3. 有活跃租约的对象<br/>(不驱逐)]
    end

    P1 -->|"优先驱逐"| P2
    P2 -->|"必要时驱逐"| P3
    P3 -->|"永不驱逐"| Never[保护中]

    style P1 fill:#FF6B6B
    style P2 fill:#FFD93D
    style P3 fill:#6BCB77

14.4 客户端监控与故障检测#

14.4.1 客户端监控线程#

客户端监控线程负责检测客户端是否存活,并在超时后清理其资源:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
void MasterService::ClientMonitorFunc() {
    std::unordered_map<UUID, std::chrono::steady_clock::time_point,
                       boost::hash<UUID>> client_ttl;

    while (client_monitor_running_) {
        auto now = std::chrono::steady_clock::now();

        // 1. 处理 Ping 消息,更新 TTL
        PodUUID pod_client_id;
        while (client_ping_queue_.pop(pod_client_id)) {
            UUID client_id = {pod_client_id.first, pod_client_id.second};
            client_ttl[client_id] =
                now + std::chrono::seconds(client_live_ttl_sec_);
        }

        // 2. 检测过期客户端
        std::vector<UUID> expired_clients;
        for (auto it = client_ttl.begin(); it != client_ttl.end();) {
            if (it->second < now) {
                LOG(INFO) << "client_id=" << it->first
                          << ", action=client_expired";
                expired_clients.push_back(it->first);
                it = client_ttl.erase(it);
            } else {
                ++it;
            }
        }

        // 3. 清理过期客户端的资源
        if (!expired_clients.empty()) {
            std::vector<UUID> unmount_segments;
            std::vector<size_t> dec_capacities;
            std::vector<UUID> client_ids;

            {
                std::unique_lock<std::shared_mutex> lock(client_mutex_);

                // 从活跃客户端集合中移除
                for (auto& client_id : expired_clients) {
                    auto it = ok_client_.find(client_id);
                    if (it != ok_client_.end()) {
                        ok_client_.erase(it);
                        MasterMetricManager::instance().dec_active_clients();
                    }
                }

                // 准备卸载所有相关 Segment
                ScopedSegmentAccess segment_access =
                    segment_manager_.getSegmentAccess();
                for (auto& client_id : expired_clients) {
                    std::vector<Segment> segments;
                    segment_access.GetClientSegments(client_id, segments);
                    for (auto& seg : segments) {
                        size_t metrics_dec_capacity = 0;
                        if (segment_access.PrepareUnmountSegment(
                                seg.id, metrics_dec_capacity) == ErrorCode::OK) {
                            unmount_segments.push_back(seg.id);
                            dec_capacities.push_back(metrics_dec_capacity);
                            client_ids.push_back(client_id);
                        }
                    }
                }
            }

            // 清理无效的元数据句柄
            if (!unmount_segments.empty()) {
                ClearInvalidHandles();

                // 提交卸载操作
                ScopedSegmentAccess segment_access =
                    segment_manager_.getSegmentAccess();
                for (size_t i = 0; i < unmount_segments.size(); i++) {
                    segment_access.CommitUnmountSegment(
                        unmount_segments[i], client_ids[i], dec_capacities[i]);
                }
            }
        }

        std::this_thread::sleep_for(
            std::chrono::milliseconds(kClientMonitorSleepMs));
    }
}

客户端生命周期管理

stateDiagram-v2
    [*] --> Unknown: 首次连接
    Unknown --> Active: MountSegment 成功
    Active --> Active: Ping 续约
    Active --> NeedRemount: TTL 超时
    NeedRemount --> Active: ReMountSegment 成功
    NeedRemount --> [*]: 长时间未恢复
    Active --> [*]: UnmountSegment

第十五章:内存分配器实现解析#

本章深入分析 Mooncake Store 的内存分配器实现,包括 CacheLib 和 Offset 两种分配策略。

15.1 分配器架构设计#

15.1.1 抽象基类定义#

所有分配器都继承自 BufferAllocatorBase

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 文件: mooncake-store/include/allocator.h

// 用于表示分配器无法精确追踪可用空间的情况
static constexpr size_t kAllocatorUnknownFreeSpace =
    std::numeric_limits<size_t>::max();

class BufferAllocatorBase {
public:
    virtual ~BufferAllocatorBase() = default;

    // 核心分配接口
    virtual std::unique_ptr<AllocatedBuffer> allocate(size_t size) = 0;
    virtual void deallocate(AllocatedBuffer* handle) = 0;

    // 容量查询
    virtual size_t capacity() const = 0;
    virtual size_t size() const = 0;

    // 元数据获取
    virtual std::string getSegmentName() const = 0;
    virtual std::string getTransportEndpoint() const = 0;

    // 最大可用区域查询(用于分配决策)
    virtual size_t getLargestFreeRegion() const = 0;
};

15.1.2 AllocatedBuffer:分配结果封装#

AllocatedBuffer 封装了分配的内存块,支持 RAII 自动释放:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class AllocatedBuffer {
public:
    friend class CachelibBufferAllocator;
    friend class OffsetBufferAllocator;

    struct Descriptor {
        uint64_t size_;
        uintptr_t buffer_address_;
        std::string transport_endpoint_;
        YLT_REFL(Descriptor, size_, buffer_address_, transport_endpoint_);
    };

    AllocatedBuffer(std::shared_ptr<BufferAllocatorBase> allocator,
                    void* buffer_ptr, std::size_t size,
                    std::optional<offset_allocator::OffsetAllocationHandle>&&
                        offset_handle = std::nullopt)
        : allocator_(std::move(allocator)),
          buffer_ptr_(buffer_ptr),
          size_(size),
          offset_handle_(std::move(offset_handle)) {}

    ~AllocatedBuffer() {
        // RAII: 自动归还内存
        auto alloc = allocator_.lock();
        if (alloc) {
            alloc->deallocate(this);
            VLOG(1) << "buf_handle_deallocated size=" << size_;
        }
    }

    // 禁止拷贝
    AllocatedBuffer(const AllocatedBuffer&) = delete;
    AllocatedBuffer& operator=(const AllocatedBuffer&) = delete;

    [[nodiscard]] void* data() const noexcept { return buffer_ptr_; }
    [[nodiscard]] std::size_t size() const noexcept { return size_; }

    // 检查分配器是否仍然有效
    [[nodiscard]] bool isAllocatorValid() const {
        return !allocator_.expired();
    }

    // 序列化为描述符(用于网络传输)
    [[nodiscard]] Descriptor get_descriptor() const {
        auto alloc = allocator_.lock();
        std::string endpoint;
        if (alloc) {
            endpoint = alloc->getTransportEndpoint();
        }
        return {static_cast<uint64_t>(size()),
                reinterpret_cast<uintptr_t>(buffer_ptr_), endpoint};
    }

private:
    std::weak_ptr<BufferAllocatorBase> allocator_;  // 弱引用避免循环
    void* buffer_ptr_{nullptr};
    std::size_t size_{0};
    std::optional<offset_allocator::OffsetAllocationHandle> offset_handle_;
};

关键设计要点

  1. weak_ptr 避免循环引用allocator_ 使用 weak_ptr,因为分配器管理着 Buffer,而 Buffer 需要引用分配器进行释放
  2. RAII 自动释放:析构函数自动调用 deallocate
  3. Descriptor 用于序列化:支持将 Buffer 信息序列化后通过网络传输

15.2 CacheLib 分配器实现#

15.2.1 初始化与配置#

CacheLib 分配器使用 Facebook CacheLib 的 Slab 分配策略:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 文件: mooncake-store/src/allocator.cpp

CachelibBufferAllocator::CachelibBufferAllocator(
    std::string segment_name, size_t base, size_t size,
    std::string transport_endpoint)
    : segment_name_(segment_name),
      base_(base),
      total_size_(size),
      cur_size_(0),
      transport_endpoint_(std::move(transport_endpoint)) {

    VLOG(1) << "initializing_buffer_allocator segment_name=" << segment_name
            << " base_address=" << reinterpret_cast<void*>(base)
            << " size=" << size;

    // 计算头部区域大小
    // CacheLib 需要额外空间存储 Slab 元数据
    header_region_size_ =
        sizeof(facebook::cachelib::SlabHeader) *
            static_cast<unsigned int>(size / sizeof(facebook::cachelib::Slab)) +
        1;
    header_region_start_ = std::make_unique<char[]>(header_region_size_);

    LOG_ASSERT(header_region_start_);

    // 初始化 CacheLib MemoryAllocator
    memory_allocator_ = std::make_unique<facebook::cachelib::MemoryAllocator>(
        facebook::cachelib::MemoryAllocator::Config(
            facebook::cachelib::MemoryAllocator::generateAllocSizes()),
        reinterpret_cast<void*>(header_region_start_.get()),
        header_region_size_,
        reinterpret_cast<void*>(base),
        size);

    // 添加主内存池
    pool_id_ = memory_allocator_->addPool("main", size);

    VLOG(1) << "buffer_allocator_initialized pool_id="
            << static_cast<int>(pool_id_);
}

CacheLib 内存布局

┌─────────────────────────────────────────────────────┐
│                Header Region (Heap)                  │
│  ┌──────────┬──────────┬──────────┬────────────┐   │
│  │SlabHdr 0 │SlabHdr 1 │SlabHdr 2 │    ...     │   │
│  └──────────┴──────────┴──────────┴────────────┘   │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│              Data Region (Base Address)              │
│  ┌──────────┬──────────┬──────────┬────────────┐   │
│  │  Slab 0  │  Slab 1  │  Slab 2  │    ...     │   │
│  │  (4MB)   │  (4MB)   │  (4MB)   │            │   │
│  └──────────┴──────────┴──────────┴────────────┘   │
└─────────────────────────────────────────────────────┘

15.2.2 分配与释放#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
std::unique_ptr<AllocatedBuffer> CachelibBufferAllocator::allocate(size_t size) {
    void* buffer = nullptr;
    try {
        // 最小分配大小限制
        size_t padding_size = std::max(size, kMinSliceSize);

        // 调用 CacheLib 分配
        buffer = memory_allocator_->allocate(pool_id_, padding_size);
        if (!buffer) {
            VLOG(1) << "allocation_failed size=" << size
                    << " segment=" << segment_name_
                    << " current_size=" << cur_size_;
            return nullptr;
        }
    } catch (const std::exception& e) {
        LOG(ERROR) << "allocation_exception error=" << e.what();
        return nullptr;
    }

    // 更新统计信息
    cur_size_.fetch_add(size);
    MasterMetricManager::instance().inc_allocated_mem_size(segment_name_, size);

    return std::make_unique<AllocatedBuffer>(shared_from_this(), buffer, size);
}

void CachelibBufferAllocator::deallocate(AllocatedBuffer* handle) {
    try {
        memory_allocator_->free(handle->buffer_ptr_);
        size_t freed_size = handle->size_;
        cur_size_.fetch_sub(freed_size);
        MasterMetricManager::instance().dec_allocated_mem_size(
            segment_name_, freed_size);
    } catch (const std::exception& e) {
        LOG(ERROR) << "deallocation_exception error=" << e.what();
    }
}

15.3 Offset 分配器实现#

15.3.1 Offset 分配器特点#

Offset 分配器使用简单的偏移量管理策略,适合需要精确控制可用空间的场景:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
OffsetBufferAllocator::OffsetBufferAllocator(
    std::string segment_name, size_t base, size_t size,
    std::string transport_endpoint)
    : segment_name_(segment_name),
      base_(base),
      total_size_(size),
      cur_size_(0),
      transport_endpoint_(std::move(transport_endpoint)) {

    try {
        // 计算容量参数
        // 初始容量: size/4096,范围 [1K, 64K]
        uint64_t init_capacity = size / 4096;
        init_capacity = std::max(init_capacity, static_cast<uint64_t>(1024));
        init_capacity = std::min(init_capacity, static_cast<uint64_t>(64 * 1024));

        // 最大容量: size/1024,范围 [1M, 64M]
        uint64_t max_capacity = size / 1024;
        max_capacity = std::max(max_capacity, static_cast<uint64_t>(1024 * 1024));
        max_capacity = std::min(max_capacity, static_cast<uint64_t>(64 * 1024 * 1024));

        // 创建 Offset 分配器
        offset_allocator_ = offset_allocator::OffsetAllocator::create(
            base, size,
            static_cast<uint32_t>(init_capacity),
            static_cast<uint32_t>(max_capacity));

        if (!offset_allocator_) {
            throw std::runtime_error("Failed to create offset allocator");
        }
    } catch (const std::exception& e) {
        LOG(ERROR) << "offset_allocator_init_exception error=" << e.what();
        throw;
    }
}

15.3.2 精确的可用空间查询#

Offset 分配器可以精确报告最大可用区域:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
size_t OffsetBufferAllocator::getLargestFreeRegion() const {
    if (!offset_allocator_) {
        return 0;
    }

    try {
        auto report = offset_allocator_->storageReport();
        return report.largestFreeRegion;
    } catch (const std::exception& e) {
        LOG(ERROR) << "Failed to get storage report: " << e.what();
        return 0;
    }
}

15.3.3 分配与释放#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
std::unique_ptr<AllocatedBuffer> OffsetBufferAllocator::allocate(size_t size) {
    if (!offset_allocator_) {
        return nullptr;
    }

    std::unique_ptr<AllocatedBuffer> allocated_buffer = nullptr;
    try {
        // 分配并获取句柄
        auto allocation_handle = offset_allocator_->allocate(size);
        if (!allocation_handle) {
            return nullptr;
        }

        void* buffer_ptr = allocation_handle->ptr();

        // 创建 Buffer,传入句柄用于 RAII 管理
        allocated_buffer = std::make_unique<AllocatedBuffer>(
            shared_from_this(), buffer_ptr, size, std::move(allocation_handle));
    } catch (const std::exception& e) {
        LOG(ERROR) << "allocation_exception error=" << e.what();
        return nullptr;
    }

    cur_size_.fetch_add(size);
    MasterMetricManager::instance().inc_allocated_mem_size(segment_name_, size);
    return allocated_buffer;
}

void OffsetBufferAllocator::deallocate(AllocatedBuffer* handle) {
    try {
        // RAII: 释放 offset_handle 自动归还空间
        size_t freed_size = handle->size();
        handle->offset_handle_.reset();
        cur_size_.fetch_sub(freed_size);
        MasterMetricManager::instance().dec_allocated_mem_size(
            segment_name_, freed_size);
    } catch (const std::exception& e) {
        LOG(ERROR) << "deallocation_exception error=" << e.what();
    }
}

15.4 分配器对比与选择#

15.4.1 特性对比#

特性 CacheLib 分配器 Offset 分配器
分配效率 O(1) 平均 O(log n)
碎片处理 Slab 内自动合并 显式管理
可用空间查询 近似值 精确值
内存开销 较高(元数据) 较低
对齐要求 4MB 对齐 无严格要求
最大分配 4MB 理论无限制

15.4.2 选择建议#

flowchart TD
    Start[选择分配器] --> Q1{需要大于 4MB<br/>的单次分配?}
    Q1 -->|是| Offset[使用 Offset 分配器]
    Q1 -->|否| Q2{需要精确的<br/>可用空间信息?}
    Q2 -->|是| Offset
    Q2 -->|否| Q3{对对齐有<br/>严格要求?}
    Q3 -->|是| Offset
    Q3 -->|否| CacheLib[使用 CacheLib 分配器]

    style CacheLib fill:#90EE90
    style Offset fill:#87CEEB

第十六章:Segment 与 Replica 管理#

本章分析 Mooncake Store 中 Segment 和 Replica 的管理机制。

16.1 Segment 管理架构#

16.1.1 Segment 定义#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 文件: mooncake-store/include/types.h

struct Segment {
    UUID id;                      // 唯一标识
    std::string name;             // 名称(如 "node001")
    uintptr_t base;               // 基地址
    size_t size;                  // 大小
    std::string te_endpoint;      // Transfer Engine 端点

    YLT_REFL(Segment, id, name, base, size, te_endpoint);
};

16.1.2 SegmentManager 实现#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 文件: mooncake-store/include/segment.h

class SegmentManager {
public:
    explicit SegmentManager(
        BufferAllocatorType memory_allocator = BufferAllocatorType::CACHELIB)
        : memory_allocator_(memory_allocator) {}

    // RAII 风格的访问接口
    ScopedSegmentAccess getSegmentAccess() {
        return ScopedSegmentAccess(this, segment_mutex_);
    }

    ScopedAllocatorAccess getAllocatorAccess() {
        return ScopedAllocatorAccess(allocator_manager_, segment_mutex_);
    }

private:
    mutable std::shared_mutex segment_mutex_;
    const BufferAllocatorType memory_allocator_;
    AllocatorManager allocator_manager_;

    // segment_id -> mounted segment
    std::unordered_map<UUID, MountedSegment, boost::hash<UUID>>
        mounted_segments_;

    // client_id -> segment_ids
    std::unordered_map<UUID, std::vector<UUID>, boost::hash<UUID>>
        client_segments_;

    // segment name -> client_id
    std::unordered_map<std::string, UUID> client_by_name_;

    friend class ScopedSegmentAccess;
};

16.1.3 Segment 挂载流程#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 文件: mooncake-store/src/segment.cpp

ErrorCode ScopedSegmentAccess::MountSegment(const Segment& segment,
                                            const UUID& client_id) {
    const uintptr_t buffer = segment.base;
    const size_t size = segment.size;

    // 1. 参数验证
    if (buffer == 0 || size == 0) {
        return ErrorCode::INVALID_PARAMS;
    }

    // 2. CacheLib 对齐检查
    if (segment_manager_->memory_allocator_ == BufferAllocatorType::CACHELIB &&
        (buffer % facebook::cachelib::Slab::kSize ||
         size % facebook::cachelib::Slab::kSize)) {
        LOG(ERROR) << "buffer or size not aligned to 4MB";
        return ErrorCode::INVALID_PARAMS;
    }

    // 3. 检查是否已存在
    auto exist_segment_it = segment_manager_->mounted_segments_.find(segment.id);
    if (exist_segment_it != segment_manager_->mounted_segments_.end()) {
        if (exist_segment_it->second.status == SegmentStatus::OK) {
            return ErrorCode::SEGMENT_ALREADY_EXISTS;
        } else {
            return ErrorCode::UNAVAILABLE_IN_CURRENT_STATUS;
        }
    }

    // 4. 创建分配器
    std::shared_ptr<BufferAllocatorBase> allocator;
    try {
        switch (segment_manager_->memory_allocator_) {
            case BufferAllocatorType::CACHELIB:
                allocator = std::make_shared<CachelibBufferAllocator>(
                    segment.name, buffer, size, segment.te_endpoint);
                break;
            case BufferAllocatorType::OFFSET:
                allocator = std::make_shared<OffsetBufferAllocator>(
                    segment.name, buffer, size, segment.te_endpoint);
                break;
            default:
                return ErrorCode::INVALID_PARAMS;
        }
    } catch (...) {
        return ErrorCode::INVALID_PARAMS;
    }

    // 5. 注册到各个映射表
    segment_manager_->allocator_manager_.addAllocator(segment.name, allocator);
    segment_manager_->client_segments_[client_id].push_back(segment.id);
    segment_manager_->mounted_segments_[segment.id] = {
        segment, SegmentStatus::OK, std::move(allocator)};
    segment_manager_->client_by_name_[segment.name] = client_id;

    // 6. 更新指标
    MasterMetricManager::instance().inc_total_mem_capacity(segment.name, size);

    return ErrorCode::OK;
}

Segment 生命周期

stateDiagram-v2
    [*] --> Undefined: 创建 MountedSegment
    Undefined --> OK: MountSegment 成功
    OK --> Unmounting: PrepareUnmountSegment
    Unmounting --> [*]: CommitUnmountSegment

    OK --> OK: 正常服务中

16.1.4 Segment 卸载:两阶段流程#

为了避免死锁和数据不一致,卸载分为准备和提交两个阶段:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 阶段 1: 准备卸载
ErrorCode ScopedSegmentAccess::PrepareUnmountSegment(
    const UUID& segment_id, size_t& metrics_dec_capacity) {

    auto it = segment_manager_->mounted_segments_.find(segment_id);
    if (it == segment_manager_->mounted_segments_.end()) {
        return ErrorCode::SEGMENT_NOT_FOUND;
    }

    if (it->second.status == SegmentStatus::UNMOUNTING) {
        return ErrorCode::UNAVAILABLE_IN_CURRENT_STATUS;
    }

    auto& mounted_segment = it->second;
    metrics_dec_capacity = mounted_segment.segment.size;

    // 从分配器管理器移除
    segment_manager_->allocator_manager_.removeAllocator(
        mounted_segment.segment.name, mounted_segment.buf_allocator);

    // 释放分配器引用
    mounted_segment.buf_allocator.reset();

    // 标记状态
    mounted_segment.status = SegmentStatus::UNMOUNTING;

    return ErrorCode::OK;
}

// 阶段 2: 提交卸载(释放锁后调用)
ErrorCode ScopedSegmentAccess::CommitUnmountSegment(
    const UUID& segment_id, const UUID& client_id,
    const size_t& metrics_dec_capacity) {

    // 从 client_segments_ 移除
    auto client_it = segment_manager_->client_segments_.find(client_id);
    if (client_it != segment_manager_->client_segments_.end()) {
        auto& segments = client_it->second;
        segments.erase(std::remove(segments.begin(), segments.end(), segment_id),
                       segments.end());
        if (segments.empty()) {
            segment_manager_->client_segments_.erase(client_it);
        }
    }

    // 获取 segment name 用于指标更新
    std::string segment_name;
    auto segment_it = segment_manager_->mounted_segments_.find(segment_id);
    if (segment_it != segment_manager_->mounted_segments_.end()) {
        segment_name = segment_it->second.segment.name;
    }

    // 从 mounted_segments_ 移除
    segment_manager_->mounted_segments_.erase(segment_id);

    // 更新指标
    MasterMetricManager::instance().dec_total_mem_capacity(
        segment_name, metrics_dec_capacity);

    return ErrorCode::OK;
}

16.2 Replica 类型系统#

16.2.1 Replica 类型定义#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 文件: mooncake-store/include/replica.h

enum class ReplicaType {
    MEMORY,     // 内存副本
    DISK,       // 磁盘副本(共享存储)
    LOCAL_DISK  // 本地磁盘副本
};

enum class ReplicaStatus {
    UNDEFINED = 0,  // 未初始化
    INITIALIZED,    // 空间已分配,等待写入
    PROCESSING,     // 写入进行中
    COMPLETE,       // 写入完成,可读
    REMOVED,        // 已移除
    FAILED,         // 失败状态
};

// 配置结构
struct ReplicateConfig {
    size_t replica_num{1};           // 副本数量
    bool with_soft_pin{false};       // 是否软锁定
    std::vector<std::string> preferred_segments{};  // 首选 Segment
    std::string preferred_segment{};  // 兼容旧版 API
    bool prefer_alloc_in_same_node{false};  // 优先同节点分配
};

16.2.2 Replica 类实现#

Replica 使用 std::variant 支持多种副本类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class Replica {
public:
    struct Descriptor;  // 前向声明

    // 内存副本构造函数
    Replica(std::unique_ptr<AllocatedBuffer> buffer, ReplicaStatus status)
        : data_(MemoryReplicaData{std::move(buffer)}), status_(status) {}

    // 磁盘副本构造函数
    Replica(std::string file_path, uint64_t object_size, ReplicaStatus status)
        : data_(DiskReplicaData{std::move(file_path), object_size}),
          status_(status) {
        // 更新磁盘使用指标
        MasterMetricManager::instance().inc_allocated_file_size(object_size);
    }

    // 本地磁盘副本构造函数
    Replica(UUID client_id, uint64_t object_size,
            std::string transport_endpoint, ReplicaStatus status)
        : data_(LocalDiskReplicaData{client_id, object_size,
                                     std::move(transport_endpoint)}),
          status_(status) {}

    // 析构时更新指标
    ~Replica() {
        if (status_ != ReplicaStatus::UNDEFINED && is_disk_replica()) {
            const auto& disk_data = std::get<DiskReplicaData>(data_);
            MasterMetricManager::instance().dec_allocated_file_size(
                disk_data.object_size);
        }
    }

    // 移动语义支持
    Replica(Replica&& src) noexcept
        : data_(std::move(src.data_)), status_(src.status_) {
        // 标记源对象为已移动,避免析构时重复更新指标
        src.status_ = ReplicaStatus::UNDEFINED;
    }

    // 类型检查
    [[nodiscard]] ReplicaType type() const {
        return std::visit(ReplicaTypeVisitor{}, data_);
    }

    [[nodiscard]] bool is_memory_replica() const {
        return std::holds_alternative<MemoryReplicaData>(data_);
    }

    // 检查内存句柄是否有效
    [[nodiscard]] bool has_invalid_mem_handle() const {
        if (is_memory_replica()) {
            const auto& mem_data = std::get<MemoryReplicaData>(data_);
            return !mem_data.buffer->isAllocatorValid();
        }
        return false;
    }

    // 状态转换
    void mark_complete() {
        if (status_ == ReplicaStatus::PROCESSING) {
            status_ = ReplicaStatus::COMPLETE;
        }
    }

private:
    std::variant<MemoryReplicaData, DiskReplicaData, LocalDiskReplicaData> data_;
    ReplicaStatus status_{ReplicaStatus::UNDEFINED};

    struct ReplicaTypeVisitor {
        ReplicaType operator()(const MemoryReplicaData&) const {
            return ReplicaType::MEMORY;
        }
        ReplicaType operator()(const DiskReplicaData&) const {
            return ReplicaType::DISK;
        }
        ReplicaType operator()(const LocalDiskReplicaData&) const {
            return ReplicaType::LOCAL_DISK;
        }
    };
};

16.2.3 Replica Descriptor:序列化支持#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
struct Replica::Descriptor {
    std::variant<MemoryDescriptor, DiskDescriptor, LocalDiskDescriptor>
        descriptor_variant;
    ReplicaStatus status;

    YLT_REFL(Descriptor, descriptor_variant, status);

    // 类型判断助手
    bool is_memory_replica() const noexcept {
        return std::holds_alternative<MemoryDescriptor>(descriptor_variant);
    }

    bool is_disk_replica() const noexcept {
        return std::holds_alternative<DiskDescriptor>(descriptor_variant);
    }

    // 类型获取器
    const MemoryDescriptor& get_memory_descriptor() const {
        if (auto* desc = std::get_if<MemoryDescriptor>(&descriptor_variant)) {
            return *desc;
        }
        throw std::runtime_error("Expected MemoryDescriptor");
    }
};

// 生成 Descriptor
inline Replica::Descriptor Replica::get_descriptor() const {
    Replica::Descriptor desc;
    desc.status = status_;

    if (is_memory_replica()) {
        const auto& mem_data = std::get<MemoryReplicaData>(data_);
        MemoryDescriptor mem_desc;
        if (mem_data.buffer) {
            mem_desc.buffer_descriptor = mem_data.buffer->get_descriptor();
        } else {
            LOG(ERROR) << "Trying to get invalid memory replica descriptor";
        }
        desc.descriptor_variant = std::move(mem_desc);
    } else if (is_disk_replica()) {
        const auto& disk_data = std::get<DiskReplicaData>(data_);
        DiskDescriptor disk_desc;
        disk_desc.file_path = disk_data.file_path;
        disk_desc.object_size = disk_data.object_size;
        desc.descriptor_variant = std::move(disk_desc);
    } else if (is_local_disk_replica()) {
        const auto& disk_data = std::get<LocalDiskReplicaData>(data_);
        LocalDiskDescriptor local_disk_desc;
        local_disk_desc.client_id = disk_data.client_id;
        local_disk_desc.object_size = disk_data.object_size;
        local_disk_desc.transport_endpoint = disk_data.transport_endpoint;
        desc.descriptor_variant = std::move(local_disk_desc);
    }

    return desc;
}

16.3 RPC 客户端实现#

16.3.1 模板化 RPC 调用#

MasterClient 使用模板实现类型安全的 RPC 调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 文件: mooncake-store/src/master_client.cpp

// RPC 方法名称特化
template <auto Method>
struct RpcNameTraits;

template <>
struct RpcNameTraits<&WrappedMasterService::GetReplicaList> {
    static constexpr const char* value = "GetReplicaList";
};

template <>
struct RpcNameTraits<&WrappedMasterService::PutStart> {
    static constexpr const char* value = "PutStart";
};

// ... 更多特化

// 统一的 RPC 调用模板
template <auto ServiceMethod, typename ReturnType, typename... Args>
tl::expected<ReturnType, ErrorCode> MasterClient::invoke_rpc(Args&&... args) {
    auto pool = client_accessor_.GetClientPool();

    // 更新指标
    if (metrics_) {
        metrics_->rpc_count.inc({RpcNameTraits<ServiceMethod>::value});
    }

    auto start_time = std::chrono::steady_clock::now();

    return async_simple::coro::syncAwait(
        [&]() -> async_simple::coro::Lazy<tl::expected<ReturnType, ErrorCode>> {
            // 发送请求
            auto ret = co_await pool->send_request(
                [&](coro_io::client_reuse_hint,
                    coro_rpc::coro_rpc_client& client) {
                    return client.send_request<ServiceMethod>(
                        std::forward<Args>(args)...);
                });

            if (!ret.has_value()) {
                LOG(ERROR) << "Client not available";
                co_return tl::make_unexpected(ErrorCode::RPC_FAIL);
            }

            auto result = co_await std::move(ret.value());
            if (!result) {
                LOG(ERROR) << "RPC call failed: " << result.error().msg;
                co_return tl::make_unexpected(ErrorCode::RPC_FAIL);
            }

            // 记录延迟
            if (metrics_) {
                auto end_time = std::chrono::steady_clock::now();
                auto latency = std::chrono::duration_cast<
                    std::chrono::microseconds>(end_time - start_time);
                metrics_->rpc_latency.observe(
                    {RpcNameTraits<ServiceMethod>::value}, latency.count());
            }

            co_return result->result();
        }());
}

16.3.2 使用示例#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
tl::expected<GetReplicaListResponse, ErrorCode> MasterClient::GetReplicaList(
    const std::string& object_key) {
    ScopedVLogTimer timer(1, "MasterClient::GetReplicaList");
    timer.LogRequest("object_key=", object_key);

    auto result = invoke_rpc<&WrappedMasterService::GetReplicaList,
                             GetReplicaListResponse>(object_key);
    timer.LogResponseExpected(result);
    return result;
}

tl::expected<std::vector<Replica::Descriptor>, ErrorCode>
MasterClient::PutStart(const std::string& key,
                       const std::vector<size_t>& slice_lengths,
                       const ReplicateConfig& config) {
    ScopedVLogTimer timer(1, "MasterClient::PutStart");

    uint64_t total_slice_length = 0;
    for (const auto& slice_length : slice_lengths) {
        total_slice_length += slice_length;
    }

    auto result = invoke_rpc<&WrappedMasterService::PutStart,
                             std::vector<Replica::Descriptor>>(
        client_id_, key, total_slice_length, config);
    return result;
}

16.4 本章小结#

本章深入分析了 Mooncake Store 的核心代码实现:

  1. MasterService:分片架构、MetadataAccessor 模式、两阶段写入事务
  2. 驱逐机制:两阶段驱逐、优先级策略、nth_element 优化
  3. 内存分配器:CacheLib 和 Offset 两种策略的实现细节
  4. Segment 管理:RAII 风格访问、两阶段卸载流程
  5. Replica 系统:variant 多态、状态机转换、序列化支持
  6. RPC 客户端:模板化调用、协程异步、指标集成

这些代码体现了现代 C++ 的最佳实践:RAII 资源管理、类型安全、零拷贝设计和高效的并发控制。


第七部分:总结与展望#


第十七章:总结与未来展望#

17.1 技术总结#

17.1.1 核心创新回顾#

Mooncake 项目在 LLM 推理系统的 KVCache 管理领域做出了多项重要创新:

1. Prefill/Decode 分离架构 (P/D Disaggregation)

这是 Mooncake 最核心的架构创新。通过将 Prefill 和 Decode 阶段分离到不同的计算节点,实现了:

  • 资源利用率优化:Prefill 节点专注于计算密集型操作,Decode 节点专注于内存带宽敏感型操作
  • 弹性扩展:可以根据实际负载独立扩展 P 节点或 D 节点
  • 硬件异构支持:不同阶段可以使用不同规格的 GPU
graph LR
    subgraph "传统架构"
        T1[请求] --> T2[Prefill + Decode<br/>在同一 GPU]
    end

    subgraph "P/D 分离架构"
        M1[请求] --> M2[Prefill<br/>计算密集]
        M2 -->|KVCache| M3[Mooncake<br/>Store]
        M3 -->|KVCache| M4[Decode<br/>内存密集]
    end

    style M3 fill:#FFD700

2. 多层存储体系

Mooncake 设计了完善的多层存储体系,实现了 KVCache 的高效管理:

层级 介质 带宽 容量 访问延迟
L0 GPU HBM ~3 TB/s ~80 GB ~ns
L1 Host DRAM (RDMA) ~400 GB/s ~1 TB ~10 μs
L2 NVMe SSD ~7 GB/s ~8 TB ~100 μs

3. 高性能传输引擎

Transfer Engine 和 TENT 提供了多协议支持的高效数据传输:

  • RDMA 零拷贝:绕过内核,直接访问远程内存
  • Slice 级并行:大对象切分后多路并行传输
  • 拓扑感知:自动选择最优传输路径

4. 智能缓存管理

  • 租约机制 (Lease):解决分布式缓存一致性问题
  • 软锁定 (Soft Pin):保护热点数据免受驱逐
  • 两阶段驱逐:优先级感知的内存回收策略

17.1.2 性能成就#

根据实际基准测试数据,Mooncake 实现了显著的性能提升:

指标 Mooncake 对比方案 提升幅度
单节点吞吐 25+ GB/s vLLM 原生 3-5x
P99 延迟 <500 μs Redis 5-10x
缓存命中率 >95% - -
ITL 减少 ~30% 无 P/D 分离 -

17.2 设计哲学总结#

17.2.1 零拷贝优先#

Mooncake 在整个数据路径上贯彻零拷贝原则:

  • RDMA 直接访问:数据不经过 CPU
  • 用户态驱动:绕过内核协议栈
  • 原地序列化:避免中间缓冲区

17.2.2 分层抽象#

代码架构遵循清晰的分层原则:

┌─────────────────────────────────┐
│     Application (vLLM/SGLang)   │  应用层
├─────────────────────────────────┤
│        Mooncake Store API       │  服务层
├─────────────────────────────────┤
│  Transfer Engine / TENT         │  传输层
├─────────────────────────────────┤
│  RDMA / NVLink / TCP / io_uring │  协议层
└─────────────────────────────────┘

17.2.3 面向故障设计#

系统在多个层面考虑了故障处理:

  • 客户端故障检测:心跳超时自动清理资源
  • Segment 卸载:两阶段提交保证一致性
  • Master 高可用:基于 etcd 的 Leader 选举

17.3 未来展望#

17.3.1 技术演进方向#

1. 更深度的硬件集成

  • CXL 内存池化:利用 CXL 技术实现更大规模的共享内存池
  • GPU Direct Storage:直接从 SSD 到 GPU 的数据路径
  • DPU 卸载:将数据处理卸载到智能网卡

2. 跨数据中心扩展

  • 地理分布式 KVCache:支持跨地域的 KVCache 共享
  • 智能预取:基于请求模式预测的 KVCache 预加载
  • 一致性协议优化:更高效的跨 DC 同步机制

3. 模型感知优化

  • 注意力模式分析:根据模型结构优化缓存策略
  • 自适应分片:动态调整 KVCache 分片大小
  • 压缩与量化:在传输和存储时进行 KVCache 压缩

17.3.2 生态系统发展#

graph TB
    subgraph "当前状态"
        V1[vLLM 集成]
        S1[SGLang 集成]
    end

    subgraph "短期目标"
        V2[更多推理框架]
        M1[MoE 模型优化]
        D1[多模态支持]
    end

    subgraph "长期愿景"
        C1[训练阶段支持]
        E1[边缘部署]
        F1[联邦学习集成]
    end

    V1 --> V2
    S1 --> M1
    V2 --> C1
    M1 --> E1
    D1 --> F1

17.3.3 社区建设#

作为开源项目,Mooncake 的未来发展依赖于活跃的社区参与:

  • 文档完善:更详细的使用指南和 API 文档
  • 测试覆盖:提升单元测试和集成测试覆盖率
  • 性能工具:开发更完善的性能分析和调试工具
  • 示例应用:提供更多场景的参考实现

17.4 结语#

Mooncake 项目代表了 LLM 推理系统优化的一个重要方向。通过创新的 P/D 分离架构、高效的 KVCache 管理和高性能的数据传输机制,它成功解决了大规模 LLM 部署中的关键性能瓶颈。

本文通过 17 个章节、约 12 万字的深入分析,全面剖析了 Mooncake 的设计理念、架构细节和代码实现。希望这份文档能够:

  1. 帮助开发者理解系统架构:从宏观设计到微观代码的全面解读
  2. 为二次开发提供参考:详细的代码解析和扩展建议
  3. 推动技术交流:促进 LLM 基础设施领域的技术讨论

随着 AI 技术的快速发展,LLM 推理系统的优化将持续是一个重要的研究和工程领域。Mooncake 提供的解决方案和积累的经验,将为这个领域的进一步发展提供有价值的参考。


致谢

感谢 Mooncake 项目的所有贡献者,以及 FAST'25 论文的作者团队。本文的分析基于开源代码和公开论文,如有任何错误或不准确之处,欢迎指正。


附录 A:术语表

术语 英文 说明
KVCache Key-Value Cache Transformer 注意力机制中的键值缓存
P/D 分离 Prefill/Decode Disaggregation 将推理的两个阶段分开执行
RDMA Remote Direct Memory Access 远程直接内存访问
ITL Inter-Token Latency Token 间延迟
TTFT Time To First Token 首 Token 延迟
Slice - 数据传输的基本单位
Segment - 内存管理的逻辑单元
Replica - 数据的副本
Lease - 缓存一致性租约
Soft Pin - 软锁定保护

附录 B:参考资料

  1. 论文:Mooncake: Accelerating LLM Serving with KV Cache Disaggregation (FAST'25)
  2. 代码仓库:https://github.com/kvcache-ai/Mooncake
  3. vLLM 项目:https://github.com/vllm-project/vllm
  4. SGLang 项目:https://github.com/sgl-project/sglang