前端 Monorepo 的 CI 构建时间已经从可接受的 5 分钟恶化到了无法容忍的 15 分钟。每个合并请求都需要在数十个隔离的、临时的 CI Runner 上完整执行 lint
, test
, build
流程。虽然我们引入了 Turbopack 来加速本地开发,但在 CI 环境中,它带来的性能提升被一个更根本的问题所抵消:缓存。
标准的 CI 缓存机制,无论是基于文件的归档上传下载,还是简单的键值存储,都存在致命缺陷。它们要么因为打包和解压巨大的 node_modules
和构建产物而消耗大量网络 I/O 和时间,要么因为缓存键(通常基于 package-lock.json
的哈希)过于粗糙,导致任何一个微小的依赖变更都使得整个缓存失效。
Turbopack 自身的缓存机制非常强大,但它被设计为在单个文件系统上工作。在分布式的、无状态的 CI Runner 环境中,一个 Runner 上的热缓存对另一个 Runner 毫无帮助。我们需要一个真正的分布式缓存系统,一个能够实时、精细地在所有 Runner 之间同步缓存状态的方案。
我们的初步构想是抛弃传统的“打包-上传-下载-解压”模式。理想的系统应该具备以下特性:
- 极度精细的缓存粒度:缓存单元应该对应到单个文件或一个组件目录,而不是整个项目。
- 内容寻址:缓存的键不应是文件名或路径,而应是文件内容的哈希。这正是 Git 的核心思想。
- 实时传播:一旦某个 Runner 生成了新的缓存产物,该信息应立即广播给所有其他可能需要它的 Runner。
- 低延迟:Runner 在启动任务时,应能以极低的延迟获取它所需要的缓存,而不是等待一个巨大的 tarball 下载。
这个构想直接将我们引向了三个技术的组合:Git
的底层命令(Plumbing),Redis Streams
,以及 Turbopack
本身。
- Git Plumbing: 我们不把 Git 仅仅看作版本控制工具,而是把它看作一个内容寻址的文件系统。
git hash-object
、git cat-file
、git ls-tree
这些底层命令可以让我们直接操作 Git 的对象数据库,获取任何文件(blob)、目录(tree)的 SHA-1 哈希。这是我们实现内容寻址和精细粒度缓存的关键。 - Redis Streams: 我们需要一个可靠的、持久化的消息总线来广播缓存状态。简单的 Redis Pub/Sub 是“即发即弃”的,无法保证消息送达。Kafka 或 RabbitMQ 对于这个场景来说又过于重型。Redis Streams 提供了完美的折中:一个轻量级的、支持消费组和消息持久化的追加日志。它可以作为我们分布式缓存系统的“神经中枢”。
- Turbopack: 它是我们优化的目标。它的缓存目录 (
.turbo/cache
) 结构相对清晰,我们可以将这些缓存产物与 Git 对象哈希关联起来。
最终的架构蓝图如下:
graph TD subgraph Git Server A[Git Push Event] --> B{Webhook}; end subgraph Cache Coordinator Service B --> C[Webhook Listener]; C -- new commit info --> D[Git Object Analyzer]; D -- git plumbing commands --> E[Git Repository Clone]; D -- new cache events --> F[Redis Client]; end subgraph Shared Infrastructure F -- XADD --> G[Redis Streams: 'cache-events']; H[Object Storage / S3] end subgraph CI Runners I[Runner 1] --> J1[Cache Agent]; K[Runner 2] --> J2[Cache Agent]; L[Runner N] --> J3[Cache Agent]; J1 -- XREADGROUP --> G; J2 -- XREADGROUP --> G; J3 -- XREADGROUP --> G; J1 -- download artifacts --> H; J2 -- download artifacts --> H; J3 -- download artifacts --> H; J1 -- populates --> M1[Turbopack Cache Dir]; J2 -- populates --> M2[Turbopack Cache Dir]; J3 -- populates --> M3[Turbopack Cache Dir]; M1 -- used by --> N1[Turbopack Build]; M2 -- used by --> N2[Turbopack Build]; M3 -- used by --> N3[Turbopack Build]; N1 -- new artifacts --> O1[Cache Uploader]; O1 -- upload artifacts --> H; O1 -- notify new cache --> C; end style Cache Coordinator Service fill:#dae8fc,stroke:#333,stroke-width:2px style CI Runners fill:#d5e8d4,stroke:#333,stroke-width:2px
这个流程的核心是一个名为 “Cache Coordinator” 的无状态服务。它监听 Git 推送事件,分析提交内容,然后将需要缓存的任务发布到 Redis Stream。每个 CI Runner 启动时,其内置的 “Cache Agent” 会从这个 Stream 中拉取信息,下载对应的缓存到本地,从而为 Turbopack 的执行预热缓存。当一个 Runner 完成了新的构建并生成了新的缓存,它会将产物上传到对象存储,并通知 Coordinator,后者再将这个新的缓存信息广播出去。
第一步: Cache Coordinator 的实现
我们选择 Go 语言来编写 Coordinator,因为它在处理并发网络请求和执行外部命令方面表现出色。下面是核心的 Webhook 处理器和 Git 分析逻辑。
// main.go
package main
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
const (
repoPath = "/tmp/monorepo" // A bare clone of the repository
redisAddr = "localhost:6379"
redisStream = "turbopack-cache-events"
webhookSecret = "your-super-secret-webhook-secret"
)
var (
rdb *redis.Client
mu sync.Mutex // Mutex to protect git operations
ctx = context.Background()
)
// GitPushEvent represents the structure of a typical Git push webhook payload.
type GitPushEvent struct {
Ref string `json:"ref"`
After string `json:"after"`
Before string `json:"before"`
Repository struct {
CloneURL string `json:"clone_url"`
} `json:"repository"`
}
func main() {
rdb = redis.NewClient(&redis.Options{
Addr: redisAddr,
})
// Initialize the bare repository
initRepo()
http.HandleFunc("/webhook", handleWebhook)
log.Println("Cache Coordinator listening on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
func initRepo() {
mu.Lock()
defer mu.Unlock()
repoURL := os.Getenv("GIT_REPO_URL")
if repoURL == "" {
log.Fatal("GIT_REPO_URL environment variable not set.")
}
if _, err := os.Stat(repoPath); os.IsNotExist(err) {
log.Printf("Cloning bare repository from %s into %s...", repoURL, repoPath)
cmd := exec.Command("git", "clone", "--bare", repoURL, repoPath)
if err := cmd.Run(); err != nil {
log.Fatalf("Failed to clone bare repo: %v", err)
}
} else {
log.Println("Bare repository already exists. Fetching updates...")
cmd := exec.Command("git", "--git-dir="+repoPath, "fetch", "origin", "+refs/heads/*:refs/heads/*", "--prune")
if err := cmd.Run(); err != nil {
log.Printf("Warning: failed to fetch updates for bare repo: %v", err)
}
}
}
func handleWebhook(w http.ResponseWriter, r *http.Request) {
// 1. Validate payload signature for security
signature := r.Header.Get("X-Hub-Signature-256")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
if !isValidSignature(body, signature, webhookSecret) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 2. Parse the webhook payload
var event GitPushEvent
if err := json.Unmarshal(body, &event); err != nil {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
// We only care about branch pushes, not tags or deletions
if !strings.HasPrefix(event.Ref, "refs/heads/") {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Event is not a branch push, skipping."))
return
}
log.Printf("Received push event for ref %s, commit %s", event.Ref, event.After)
// 3. Process the git changes in a separate goroutine to respond quickly
go processGitChanges(event.After)
w.WriteHeader(http.StatusAccepted)
w.Write([]byte("Accepted"))
}
func processGitChanges(commitSHA string) {
mu.Lock()
defer mu.Unlock()
// 4. Fetch the latest changes into our bare repo
log.Printf("Fetching latest changes for commit %s...", commitSHA)
fetchCmd := exec.Command("git", "--git-dir="+repoPath, "fetch", "origin")
fetchCmd.Stderr = os.Stderr
if err := fetchCmd.Run(); err != nil {
log.Printf("Error fetching repo: %v", err)
return
}
// 5. Use git ls-tree to find all trackable components (e.g., those with package.json)
// In a real project, you'd have a more robust way to discover packages.
lsTreeCmd := exec.Command("git", "--git-dir="+repoPath, "ls-tree", "-r", commitSHA)
output, err := lsTreeCmd.Output()
if err != nil {
log.Printf("Error running ls-tree for %s: %v", commitSHA, err)
return
}
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
componentTreeHashes := make(map[string]string)
for _, line := range lines {
parts := strings.Fields(line)
// parts[0]: mode, parts[1]: type, parts[2]: hash, parts[3]: path
if len(parts) < 4 {
continue
}
filePath := parts[3]
// Let's assume a component is defined by the presence of a 'package.json'.
// The cache key will be the hash of the directory 'tree' containing it.
if strings.HasSuffix(filePath, "package.json") {
dirPath := filepath.Dir(filePath)
// We need the tree hash of the directory itself.
// This requires another git command.
treeHashCmd := exec.Command("git", "--git-dir="+repoPath, "rev-parse", fmt.Sprintf("%s:%s", commitSHA, dirPath))
treeHashBytes, err := treeHashCmd.Output()
if err != nil {
// This can happen if the path is a submodule, etc. Ignore for now.
continue
}
treeHash := strings.TrimSpace(string(treeHashBytes))
componentTreeHashes[dirPath] = treeHash
}
}
// 6. Publish cache check requests to Redis Streams for each component
for path, hash := range componentTreeHashes {
log.Printf("Publishing cache request for component '%s' with tree hash '%s'", path, hash)
// The message in the stream indicates that a build might need cache for this specific content hash.
// The CI agent will be responsible for checking if cache exists and fetching it.
// Our job here is just to announce the "content identity" of what's being built.
err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: redisStream,
Values: map[string]interface{}{
"component_path": path,
"git_tree_hash": hash,
"event_type": "CACHE_CHECK_REQUEST",
"timestamp": time.Now().Unix(),
},
}).Err()
if err != nil {
log.Printf("Failed to XADD to Redis Stream for %s: %v", path, err)
}
}
}
func isValidSignature(body []byte, signatureHeader string, secret string) bool {
if !strings.HasPrefix(signatureHeader, "sha256=") {
return false
}
actualSignature := strings.TrimPrefix(signatureHeader, "sha256=")
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expectedMAC := hex.EncodeToString(mac.Sum(null))
return hmac.Equal([]byte(actualSignature), []byte(expectedMAC))
}
这里的核心在于 processGitChanges
函数。它没有使用简单的 commit hash 作为缓存键,而是深入了一层。它遍历了整个代码树 (git ls-tree -r
),识别出我们关心的每个独立组件(在这里用 package.json
的存在来判断),然后获取该组件目录对应的 tree
对象的哈希。这个 tree
哈希是该目录下所有文件和子目录内容的一个递归哈希。这意味着,只要组件内的任何文件发生变化,这个 tree
哈希就会改变。这提供了我们所追求的精细缓存粒度。
第二步: CI Runner Agent 的实现
在每个 CI Runner 启动时,一个前置脚本(Agent)会运行。它负责连接到 Redis Stream,读取需要预热的缓存信息,并从对象存储中下载它们。我们用一个 Shell 脚本来模拟这个过程,因为它在 CI 环境中更常见。
#!/bin/bash
# ci-agent-pre-build.sh
set -euo pipefail
# --- Configuration ---
REDIS_CLI="redis-cli"
REDIS_STREAM="turbopack-cache-events"
CONSUMER_GROUP="ci-runner-group"
CONSUMER_NAME="${CI_RUNNER_ID:-runner-$(hostname)-$$}" # Unique consumer name
OBJECT_STORAGE_BASE_URL="http://minio:9000/turbopack-cache"
TURBO_CACHE_PATH=".turbo/cache"
# --- Main Logic ---
echo "CI Agent starting for consumer: ${CONSUMER_NAME}"
# 1. Ensure consumer group exists. The '$' means start from the latest message.
# We use 'MKSTREAM' to create the stream if it doesn't exist.
$REDIS_CLI XGROUP CREATE $REDIS_STREAM $CONSUMER_GROUP $ MKSTREAM > /dev/null 2>&1 || true
# 2. Create the local cache directory
mkdir -p $TURBO_CACHE_PATH
# 3. Read from the stream. We'll process up to 100 messages in one go.
# BLOCK 2000 means it will wait for up to 2 seconds for new messages.
# '>' is a special ID meaning "read messages not yet delivered to any other consumer".
echo "Reading cache events from stream '$REDIS_STREAM'..."
while read -r stream_key; do
# The output format is nested arrays. We only care about the message ID and values.
# We use a simple awk script to flatten the key-value pairs.
message_id=$(echo "$stream_key" | awk 'NR==2 {print $1}')
if [ -z "$message_id" ]; then
echo "No new messages in stream. Proceeding with build."
break
fi
echo "Processing message ID: $message_id"
# Flatten key-value pairs like: key1 val1 key2 val2 ...
values=$(echo "$stream_key" | awk 'NR>2 {for(i=1; i<=NF; i+=2) printf "%s=%s\n", $i, $(i+1)}')
# Source the key-value pairs into shell variables
eval "$values"
# We are interested in CACHE_CHECK_REQUEST events
if [ "${event_type:-}" != "CACHE_CHECK_REQUEST" ]; then
# Acknowledge the message so it's not processed again
$REDIS_CLI XACK $REDIS_STREAM $CONSUMER_GROUP $message_id > /dev/null
continue
fi
GIT_TREE_HASH="${git_tree_hash}"
COMPONENT_PATH="${component_path}"
echo "Found cache request for component '$COMPONENT_PATH' with hash '$GIT_TREE_HASH'"
# 4. Construct artifact URL and download
# The cache artifact is named after its git tree hash.
ARTIFACT_NAME="${GIT_TREE_HASH}.tgz"
ARTIFACT_URL="${OBJECT_STORAGE_BASE_URL}/${ARTIFACT_NAME}"
LOCAL_ARTIFACT_PATH="/tmp/${ARTIFACT_NAME}"
echo "Attempting to download cache from ${ARTIFACT_URL}"
# Use curl with --fail to treat 4xx/5xx as errors
if curl --fail -s -o "${LOCAL_ARTIFACT_PATH}" "${ARTIFACT_URL}"; then
echo "Cache hit! Downloaded ${ARTIFACT_NAME}."
# 5. Unpack the cache artifact into the correct location
# The archive should contain the contents of the .turbo/cache directory for that build.
tar -xzf "${LOCAL_ARTIFACT_PATH}" -C "${TURBO_CACHE_PATH}"
echo "Unpacked cache for hash ${GIT_TREE_HASH}."
rm "${LOCAL_ARTIFACT_PATH}"
else
echo "Cache miss for hash ${GIT_TREE_HASH}. It will be generated during the build."
fi
# 6. Acknowledge the message in Redis
$REDIS_CLI XACK $REDIS_STREAM $CONSUMER_GROUP $message_id > /dev/null
echo "Acknowledged message ${message_id}."
# Read up to 100 messages, wait up to 2 seconds
done < <($REDIS_CLI XREADGROUP GROUP $CONSUMER_GROUP $CONSUMER_NAME COUNT 100 BLOCK 2000 STREAMS $REDIS_STREAM '>')
echo "CI Agent finished pre-build cache warming."
这个 Agent 脚本是整个系统的客户端。它使用 XREADGROUP
以确保同一条消息不会被多个 Runner 重复处理。当它收到一个 CACHE_CHECK_REQUEST
事件时,它会根据 git_tree_hash
构造一个预期的缓存包 URL,并尝试下载。如果下载成功(Cache Hit),它就地解压,预热了 Turbopack 的缓存。如果下载失败(Cache Miss),也无妨,构建会正常进行,只是速度慢一些。关键在于,它通过消费 Redis Stream,将被动、笨重的缓存下载过程,变为了主动、精细的缓存预热。
第三步: 缓存的生成与上传
当一个 CI 构建成功后,一个后置脚本需要运行。它负责将新生成的 Turbopack 缓存打包、上传,并通知 Coordinator。
#!/bin/bash
# ci-agent-post-build.sh
set -euo pipefail
# --- Configuration ---
OBJECT_STORAGE_BASE_URL="http://minio:9000/turbopack-cache"
TURBO_CACHE_PATH=".turbo/cache"
COORDINATOR_API_URL="http://coordinator:8080/notify-cache"
# --- Main Logic ---
# We need the Git tree hash of the component that was just built.
# This should be passed from the main CI job.
COMPONENT_PATH="${1:-}"
if [ -z "$COMPONENT_PATH" ]; then
echo "Usage: $0 <path-to-component>"
exit 1
fi
echo "Post-build step for component: ${COMPONENT_PATH}"
# 1. Calculate the git tree hash for the component's current state.
# This must match the hash used by the coordinator.
TREE_HASH=$(git rev-parse HEAD:${COMPONENT_PATH})
if [ -z "$TREE_HASH" ]; then
echo "Error: Could not resolve tree hash for path '${COMPONENT_PATH}'"
exit 1
fi
echo "Component tree hash is: ${TREE_HASH}"
# 2. Find the cache artifacts generated by Turbopack.
# Turbopack's cache artifacts often contain a hash in their filename.
# Let's assume the build process for a component generates a specific artifact.
# A robust implementation would parse Turbopack's output or manifest.
# For simplicity, we assume the relevant cache file is identifiable.
# Let's say it's named based on the source hash.
EXPECTED_CACHE_FILE="${TURBO_CACHE_PATH}/${TREE_HASH}"
if [ ! -f "$EXPECTED_CACHE_FILE" ]; then
echo "No new cache file found at ${EXPECTED_CACHE_FILE}. Nothing to upload."
exit 0
fi
# 3. Pack the artifact(s) for upload.
ARTIFACT_NAME="${TREE_HASH}.tgz"
LOCAL_ARTIFACT_PATH="/tmp/${ARTIFACT_NAME}"
echo "Packing cache file '${EXPECTED_CACHE_FILE}' into '${LOCAL_ARTIFACT_PATH}'"
tar -czf "${LOCAL_ARTIFACT_PATH}" -C "${TURBO_CACHE_PATH}" "${TREE_HASH}"
# 4. Upload to object storage.
ARTIFACT_URL="${OBJECT_STORAGE_BASE_URL}/${ARTIFACT_NAME}"
echo "Uploading cache to ${ARTIFACT_URL}"
curl -X PUT --data-binary @"${LOCAL_ARTIFACT_PATH}" "${ARTIFACT_URL}"
# 5. Notify the Coordinator (optional but recommended)
# This could trigger further optimizations, like pre-warming other runners.
# For now, our system is reactive, so this step is for future enhancement.
# curl -X POST -d "{\"git_tree_hash\": \"${TREE_HASH}\", \"storage_path\": \"${ARTIFACT_URL}\"}" "${COORDINATOR_API_URL}"
echo "Cache for hash ${TREE_HASH} successfully uploaded."
rm "${LOCAL_ARTIFACT_PATH}"
这个脚本完成了闭环。在一个 Cache Miss 的构建之后,它将生成的缓存产物以其对应的 tree
哈希命名,并上传到共享的对象存储中。这样,下一个构建相同代码版本的 Runner,在执行 pre-build
脚本时就能成功命中并下载这个缓存。
局限性与未来迭代
这个系统的实现虽然解决了最初的痛点,但它并非没有代价和局限性。
首先,系统的复杂性显著增加了。我们引入了一个新的服务(Coordinator)、一个新的依赖(Redis),以及一套新的 CI 脚本逻辑。维护这个系统的成本需要与它节省的构建时间进行权衡。在真实项目中,Coordinator 需要做到高可用,Redis 需要有备份和监控,这都是额外的运维开销。
其次,存储成本可能会成为一个问题。基于 Git 对象哈希的缓存粒度非常细,这意味着每次微小的代码改动都会生成新的缓存对象。如果不加管理,对象存储中的文件数量会爆炸式增长。必须配套实现一套严格的生命周期管理(Lifecycle Policy)策略,定期清理那些与任何活动分支都无关的旧缓存对象。
再者,对 Turbopack 内部的依赖是一个潜在风险。我们的上传脚本假设能够轻易识别出与源码哈希对应的缓存产物。如果 Turbopack 的缓存结构发生变化,这部分逻辑就需要同步更新。一个更健壮的方案可能需要 Turbopack 提供一个缓存清单(manifest)文件,明确指出输入哈希和输出缓存文件之间的映射关系。
未来的优化路径也十分清晰。当前的系统是纯粹被动的。我们可以让它变得更具预测性。例如,Coordinator 在收到一个指向 feature-A
分支的提交时,可以分析出它最有可能被合并到 develop
分支。于是,它可以主动向 Redis Stream 推送一个预热 develop
分支最新状态的缓存任务,使得开发者在创建合并请求时,对应的 CI 检查几乎可以瞬间完成。这能进一步缩短反馈循环,提升开发体验。