构建基于Git对象模型与Redis Streams的Turbopack实时分布式缓存


前端 Monorepo 的 CI 构建时间已经从可接受的 5 分钟恶化到了无法容忍的 15 分钟。每个合并请求都需要在数十个隔离的、临时的 CI Runner 上完整执行 lint, test, build 流程。虽然我们引入了 Turbopack 来加速本地开发,但在 CI 环境中,它带来的性能提升被一个更根本的问题所抵消:缓存。

标准的 CI 缓存机制,无论是基于文件的归档上传下载,还是简单的键值存储,都存在致命缺陷。它们要么因为打包和解压巨大的 node_modules 和构建产物而消耗大量网络 I/O 和时间,要么因为缓存键(通常基于 package-lock.json 的哈希)过于粗糙,导致任何一个微小的依赖变更都使得整个缓存失效。

Turbopack 自身的缓存机制非常强大,但它被设计为在单个文件系统上工作。在分布式的、无状态的 CI Runner 环境中,一个 Runner 上的热缓存对另一个 Runner 毫无帮助。我们需要一个真正的分布式缓存系统,一个能够实时、精细地在所有 Runner 之间同步缓存状态的方案。

我们的初步构想是抛弃传统的“打包-上传-下载-解压”模式。理想的系统应该具备以下特性:

  1. 极度精细的缓存粒度:缓存单元应该对应到单个文件或一个组件目录,而不是整个项目。
  2. 内容寻址:缓存的键不应是文件名或路径,而应是文件内容的哈希。这正是 Git 的核心思想。
  3. 实时传播:一旦某个 Runner 生成了新的缓存产物,该信息应立即广播给所有其他可能需要它的 Runner。
  4. 低延迟:Runner 在启动任务时,应能以极低的延迟获取它所需要的缓存,而不是等待一个巨大的 tarball 下载。

这个构想直接将我们引向了三个技术的组合:Git 的底层命令(Plumbing),Redis Streams,以及 Turbopack 本身。

  • Git Plumbing: 我们不把 Git 仅仅看作版本控制工具,而是把它看作一个内容寻址的文件系统。git hash-objectgit cat-filegit ls-tree 这些底层命令可以让我们直接操作 Git 的对象数据库,获取任何文件(blob)、目录(tree)的 SHA-1 哈希。这是我们实现内容寻址和精细粒度缓存的关键。
  • Redis Streams: 我们需要一个可靠的、持久化的消息总线来广播缓存状态。简单的 Redis Pub/Sub 是“即发即弃”的,无法保证消息送达。Kafka 或 RabbitMQ 对于这个场景来说又过于重型。Redis Streams 提供了完美的折中:一个轻量级的、支持消费组和消息持久化的追加日志。它可以作为我们分布式缓存系统的“神经中枢”。
  • Turbopack: 它是我们优化的目标。它的缓存目录 (.turbo/cache) 结构相对清晰,我们可以将这些缓存产物与 Git 对象哈希关联起来。

最终的架构蓝图如下:

graph TD
    subgraph Git Server
        A[Git Push Event] --> B{Webhook};
    end

    subgraph Cache Coordinator Service
        B --> C[Webhook Listener];
        C -- new commit info --> D[Git Object Analyzer];
        D -- git plumbing commands --> E[Git Repository Clone];
        D -- new cache events --> F[Redis Client];
    end

    subgraph Shared Infrastructure
        F -- XADD --> G[Redis Streams: 'cache-events'];
        H[Object Storage / S3]
    end

    subgraph CI Runners
        I[Runner 1] --> J1[Cache Agent];
        K[Runner 2] --> J2[Cache Agent];
        L[Runner N] --> J3[Cache Agent];

        J1 -- XREADGROUP --> G;
        J2 -- XREADGROUP --> G;
        J3 -- XREADGROUP --> G;
        
        J1 -- download artifacts --> H;
        J2 -- download artifacts --> H;
        J3 -- download artifacts --> H;

        J1 -- populates --> M1[Turbopack Cache Dir];
        J2 -- populates --> M2[Turbopack Cache Dir];
        J3 -- populates --> M3[Turbopack Cache Dir];

        M1 -- used by --> N1[Turbopack Build];
        M2 -- used by --> N2[Turbopack Build];
        M3 -- used by --> N3[Turbopack Build];
        
        N1 -- new artifacts --> O1[Cache Uploader];
        O1 -- upload artifacts --> H;
        O1 -- notify new cache --> C;
    end

    style Cache Coordinator Service fill:#dae8fc,stroke:#333,stroke-width:2px
    style CI Runners fill:#d5e8d4,stroke:#333,stroke-width:2px

这个流程的核心是一个名为 “Cache Coordinator” 的无状态服务。它监听 Git 推送事件,分析提交内容,然后将需要缓存的任务发布到 Redis Stream。每个 CI Runner 启动时,其内置的 “Cache Agent” 会从这个 Stream 中拉取信息,下载对应的缓存到本地,从而为 Turbopack 的执行预热缓存。当一个 Runner 完成了新的构建并生成了新的缓存,它会将产物上传到对象存储,并通知 Coordinator,后者再将这个新的缓存信息广播出去。

第一步: Cache Coordinator 的实现

我们选择 Go 语言来编写 Coordinator,因为它在处理并发网络请求和执行外部命令方面表现出色。下面是核心的 Webhook 处理器和 Git 分析逻辑。

// main.go
package main

import (
	"context"
	"crypto/hmac"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"os/exec"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/go-redis/redis/v8"
)

const (
	repoPath      = "/tmp/monorepo" // A bare clone of the repository
	redisAddr     = "localhost:6379"
	redisStream   = "turbopack-cache-events"
	webhookSecret = "your-super-secret-webhook-secret"
)

var (
	rdb *redis.Client
	mu  sync.Mutex // Mutex to protect git operations
	ctx = context.Background()
)

// GitPushEvent represents the structure of a typical Git push webhook payload.
type GitPushEvent struct {
	Ref    string `json:"ref"`
	After  string `json:"after"`
	Before string `json:"before"`
	Repository struct {
		CloneURL string `json:"clone_url"`
	} `json:"repository"`
}

func main() {
	rdb = redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})

	// Initialize the bare repository
	initRepo()

	http.HandleFunc("/webhook", handleWebhook)
	log.Println("Cache Coordinator listening on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

func initRepo() {
	mu.Lock()
	defer mu.Unlock()

	repoURL := os.Getenv("GIT_REPO_URL")
	if repoURL == "" {
		log.Fatal("GIT_REPO_URL environment variable not set.")
	}

	if _, err := os.Stat(repoPath); os.IsNotExist(err) {
		log.Printf("Cloning bare repository from %s into %s...", repoURL, repoPath)
		cmd := exec.Command("git", "clone", "--bare", repoURL, repoPath)
		if err := cmd.Run(); err != nil {
			log.Fatalf("Failed to clone bare repo: %v", err)
		}
	} else {
		log.Println("Bare repository already exists. Fetching updates...")
		cmd := exec.Command("git", "--git-dir="+repoPath, "fetch", "origin", "+refs/heads/*:refs/heads/*", "--prune")
		if err := cmd.Run(); err != nil {
			log.Printf("Warning: failed to fetch updates for bare repo: %v", err)
		}
	}
}

func handleWebhook(w http.ResponseWriter, r *http.Request) {
	// 1. Validate payload signature for security
	signature := r.Header.Get("X-Hub-Signature-256")
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		return
	}
	if !isValidSignature(body, signature, webhookSecret) {
		http.Error(w, "Unauthorized", http.StatusUnauthorized)
		return
	}

	// 2. Parse the webhook payload
	var event GitPushEvent
	if err := json.Unmarshal(body, &event); err != nil {
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}

	// We only care about branch pushes, not tags or deletions
	if !strings.HasPrefix(event.Ref, "refs/heads/") {
		w.WriteHeader(http.StatusOK)
		w.Write([]byte("Event is not a branch push, skipping."))
		return
	}

	log.Printf("Received push event for ref %s, commit %s", event.Ref, event.After)

	// 3. Process the git changes in a separate goroutine to respond quickly
	go processGitChanges(event.After)

	w.WriteHeader(http.StatusAccepted)
	w.Write([]byte("Accepted"))
}

func processGitChanges(commitSHA string) {
	mu.Lock()
	defer mu.Unlock()

	// 4. Fetch the latest changes into our bare repo
	log.Printf("Fetching latest changes for commit %s...", commitSHA)
	fetchCmd := exec.Command("git", "--git-dir="+repoPath, "fetch", "origin")
	fetchCmd.Stderr = os.Stderr
	if err := fetchCmd.Run(); err != nil {
		log.Printf("Error fetching repo: %v", err)
		return
	}

	// 5. Use git ls-tree to find all trackable components (e.g., those with package.json)
	// In a real project, you'd have a more robust way to discover packages.
	lsTreeCmd := exec.Command("git", "--git-dir="+repoPath, "ls-tree", "-r", commitSHA)
	output, err := lsTreeCmd.Output()
	if err != nil {
		log.Printf("Error running ls-tree for %s: %v", commitSHA, err)
		return
	}

	lines := strings.Split(strings.TrimSpace(string(output)), "\n")
	componentTreeHashes := make(map[string]string)

	for _, line := range lines {
		parts := strings.Fields(line)
		// parts[0]: mode, parts[1]: type, parts[2]: hash, parts[3]: path
		if len(parts) < 4 {
			continue
		}

		filePath := parts[3]
		// Let's assume a component is defined by the presence of a 'package.json'.
		// The cache key will be the hash of the directory 'tree' containing it.
		if strings.HasSuffix(filePath, "package.json") {
			dirPath := filepath.Dir(filePath)
			// We need the tree hash of the directory itself.
			// This requires another git command.
			treeHashCmd := exec.Command("git", "--git-dir="+repoPath, "rev-parse", fmt.Sprintf("%s:%s", commitSHA, dirPath))
			treeHashBytes, err := treeHashCmd.Output()
			if err != nil {
				// This can happen if the path is a submodule, etc. Ignore for now.
				continue
			}
			treeHash := strings.TrimSpace(string(treeHashBytes))
			componentTreeHashes[dirPath] = treeHash
		}
	}
	
	// 6. Publish cache check requests to Redis Streams for each component
	for path, hash := range componentTreeHashes {
		log.Printf("Publishing cache request for component '%s' with tree hash '%s'", path, hash)
		
		// The message in the stream indicates that a build might need cache for this specific content hash.
		// The CI agent will be responsible for checking if cache exists and fetching it.
		// Our job here is just to announce the "content identity" of what's being built.
		err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: redisStream,
			Values: map[string]interface{}{
				"component_path": path,
				"git_tree_hash":  hash,
				"event_type":     "CACHE_CHECK_REQUEST",
				"timestamp":      time.Now().Unix(),
			},
		}).Err()
		
		if err != nil {
			log.Printf("Failed to XADD to Redis Stream for %s: %v", path, err)
		}
	}
}

func isValidSignature(body []byte, signatureHeader string, secret string) bool {
	if !strings.HasPrefix(signatureHeader, "sha256=") {
		return false
	}
	actualSignature := strings.TrimPrefix(signatureHeader, "sha256=")
	
	mac := hmac.New(sha256.New, []byte(secret))
	mac.Write(body)
	expectedMAC := hex.EncodeToString(mac.Sum(null))

	return hmac.Equal([]byte(actualSignature), []byte(expectedMAC))
}

这里的核心在于 processGitChanges 函数。它没有使用简单的 commit hash 作为缓存键,而是深入了一层。它遍历了整个代码树 (git ls-tree -r),识别出我们关心的每个独立组件(在这里用 package.json 的存在来判断),然后获取该组件目录对应的 tree 对象的哈希。这个 tree 哈希是该目录下所有文件和子目录内容的一个递归哈希。这意味着,只要组件内的任何文件发生变化,这个 tree 哈希就会改变。这提供了我们所追求的精细缓存粒度。

第二步: CI Runner Agent 的实现

在每个 CI Runner 启动时,一个前置脚本(Agent)会运行。它负责连接到 Redis Stream,读取需要预热的缓存信息,并从对象存储中下载它们。我们用一个 Shell 脚本来模拟这个过程,因为它在 CI 环境中更常见。

#!/bin/bash

# ci-agent-pre-build.sh

set -euo pipefail

# --- Configuration ---
REDIS_CLI="redis-cli"
REDIS_STREAM="turbopack-cache-events"
CONSUMER_GROUP="ci-runner-group"
CONSUMER_NAME="${CI_RUNNER_ID:-runner-$(hostname)-$$}" # Unique consumer name
OBJECT_STORAGE_BASE_URL="http://minio:9000/turbopack-cache"
TURBO_CACHE_PATH=".turbo/cache"

# --- Main Logic ---

echo "CI Agent starting for consumer: ${CONSUMER_NAME}"

# 1. Ensure consumer group exists. The '$' means start from the latest message.
# We use 'MKSTREAM' to create the stream if it doesn't exist.
$REDIS_CLI XGROUP CREATE $REDIS_STREAM $CONSUMER_GROUP $ MKSTREAM > /dev/null 2>&1 || true

# 2. Create the local cache directory
mkdir -p $TURBO_CACHE_PATH

# 3. Read from the stream. We'll process up to 100 messages in one go.
# BLOCK 2000 means it will wait for up to 2 seconds for new messages.
# '>' is a special ID meaning "read messages not yet delivered to any other consumer".
echo "Reading cache events from stream '$REDIS_STREAM'..."

while read -r stream_key; do
    # The output format is nested arrays. We only care about the message ID and values.
    # We use a simple awk script to flatten the key-value pairs.
    message_id=$(echo "$stream_key" | awk 'NR==2 {print $1}')
    if [ -z "$message_id" ]; then
      echo "No new messages in stream. Proceeding with build."
      break
    fi

    echo "Processing message ID: $message_id"
    
    # Flatten key-value pairs like: key1 val1 key2 val2 ...
    values=$(echo "$stream_key" | awk 'NR>2 {for(i=1; i<=NF; i+=2) printf "%s=%s\n", $i, $(i+1)}')

    # Source the key-value pairs into shell variables
    eval "$values"

    # We are interested in CACHE_CHECK_REQUEST events
    if [ "${event_type:-}" != "CACHE_CHECK_REQUEST" ]; then
        # Acknowledge the message so it's not processed again
        $REDIS_CLI XACK $REDIS_STREAM $CONSUMER_GROUP $message_id > /dev/null
        continue
    fi

    GIT_TREE_HASH="${git_tree_hash}"
    COMPONENT_PATH="${component_path}"

    echo "Found cache request for component '$COMPONENT_PATH' with hash '$GIT_TREE_HASH'"

    # 4. Construct artifact URL and download
    # The cache artifact is named after its git tree hash.
    ARTIFACT_NAME="${GIT_TREE_HASH}.tgz"
    ARTIFACT_URL="${OBJECT_STORAGE_BASE_URL}/${ARTIFACT_NAME}"
    LOCAL_ARTIFACT_PATH="/tmp/${ARTIFACT_NAME}"

    echo "Attempting to download cache from ${ARTIFACT_URL}"

    # Use curl with --fail to treat 4xx/5xx as errors
    if curl --fail -s -o "${LOCAL_ARTIFACT_PATH}" "${ARTIFACT_URL}"; then
        echo "Cache hit! Downloaded ${ARTIFACT_NAME}."
        
        # 5. Unpack the cache artifact into the correct location
        # The archive should contain the contents of the .turbo/cache directory for that build.
        tar -xzf "${LOCAL_ARTIFACT_PATH}" -C "${TURBO_CACHE_PATH}"
        echo "Unpacked cache for hash ${GIT_TREE_HASH}."
        rm "${LOCAL_ARTIFACT_PATH}"
    else
        echo "Cache miss for hash ${GIT_TREE_HASH}. It will be generated during the build."
    fi

    # 6. Acknowledge the message in Redis
    $REDIS_CLI XACK $REDIS_STREAM $CONSUMER_GROUP $message_id > /dev/null
    echo "Acknowledged message ${message_id}."

# Read up to 100 messages, wait up to 2 seconds
done < <($REDIS_CLI XREADGROUP GROUP $CONSUMER_GROUP $CONSUMER_NAME COUNT 100 BLOCK 2000 STREAMS $REDIS_STREAM '>')

echo "CI Agent finished pre-build cache warming."

这个 Agent 脚本是整个系统的客户端。它使用 XREADGROUP 以确保同一条消息不会被多个 Runner 重复处理。当它收到一个 CACHE_CHECK_REQUEST 事件时,它会根据 git_tree_hash 构造一个预期的缓存包 URL,并尝试下载。如果下载成功(Cache Hit),它就地解压,预热了 Turbopack 的缓存。如果下载失败(Cache Miss),也无妨,构建会正常进行,只是速度慢一些。关键在于,它通过消费 Redis Stream,将被动、笨重的缓存下载过程,变为了主动、精细的缓存预热。

第三步: 缓存的生成与上传

当一个 CI 构建成功后,一个后置脚本需要运行。它负责将新生成的 Turbopack 缓存打包、上传,并通知 Coordinator。

#!/bin/bash

# ci-agent-post-build.sh

set -euo pipefail

# --- Configuration ---
OBJECT_STORAGE_BASE_URL="http://minio:9000/turbopack-cache"
TURBO_CACHE_PATH=".turbo/cache"
COORDINATOR_API_URL="http://coordinator:8080/notify-cache"

# --- Main Logic ---

# We need the Git tree hash of the component that was just built.
# This should be passed from the main CI job.
COMPONENT_PATH="${1:-}"
if [ -z "$COMPONENT_PATH" ]; then
    echo "Usage: $0 <path-to-component>"
    exit 1
fi

echo "Post-build step for component: ${COMPONENT_PATH}"

# 1. Calculate the git tree hash for the component's current state.
# This must match the hash used by the coordinator.
TREE_HASH=$(git rev-parse HEAD:${COMPONENT_PATH})
if [ -z "$TREE_HASH" ]; then
    echo "Error: Could not resolve tree hash for path '${COMPONENT_PATH}'"
    exit 1
fi

echo "Component tree hash is: ${TREE_HASH}"

# 2. Find the cache artifacts generated by Turbopack.
# Turbopack's cache artifacts often contain a hash in their filename.
# Let's assume the build process for a component generates a specific artifact.
# A robust implementation would parse Turbopack's output or manifest.
# For simplicity, we assume the relevant cache file is identifiable.
# Let's say it's named based on the source hash.
EXPECTED_CACHE_FILE="${TURBO_CACHE_PATH}/${TREE_HASH}" 

if [ ! -f "$EXPECTED_CACHE_FILE" ]; then
    echo "No new cache file found at ${EXPECTED_CACHE_FILE}. Nothing to upload."
    exit 0
fi

# 3. Pack the artifact(s) for upload.
ARTIFACT_NAME="${TREE_HASH}.tgz"
LOCAL_ARTIFACT_PATH="/tmp/${ARTIFACT_NAME}"

echo "Packing cache file '${EXPECTED_CACHE_FILE}' into '${LOCAL_ARTIFACT_PATH}'"
tar -czf "${LOCAL_ARTIFACT_PATH}" -C "${TURBO_CACHE_PATH}" "${TREE_HASH}"

# 4. Upload to object storage.
ARTIFACT_URL="${OBJECT_STORAGE_BASE_URL}/${ARTIFACT_NAME}"
echo "Uploading cache to ${ARTIFACT_URL}"

curl -X PUT --data-binary @"${LOCAL_ARTIFACT_PATH}" "${ARTIFACT_URL}"

# 5. Notify the Coordinator (optional but recommended)
# This could trigger further optimizations, like pre-warming other runners.
# For now, our system is reactive, so this step is for future enhancement.
# curl -X POST -d "{\"git_tree_hash\": \"${TREE_HASH}\", \"storage_path\": \"${ARTIFACT_URL}\"}" "${COORDINATOR_API_URL}"

echo "Cache for hash ${TREE_HASH} successfully uploaded."
rm "${LOCAL_ARTIFACT_PATH}"

这个脚本完成了闭环。在一个 Cache Miss 的构建之后,它将生成的缓存产物以其对应的 tree 哈希命名,并上传到共享的对象存储中。这样,下一个构建相同代码版本的 Runner,在执行 pre-build 脚本时就能成功命中并下载这个缓存。

局限性与未来迭代

这个系统的实现虽然解决了最初的痛点,但它并非没有代价和局限性。

首先,系统的复杂性显著增加了。我们引入了一个新的服务(Coordinator)、一个新的依赖(Redis),以及一套新的 CI 脚本逻辑。维护这个系统的成本需要与它节省的构建时间进行权衡。在真实项目中,Coordinator 需要做到高可用,Redis 需要有备份和监控,这都是额外的运维开销。

其次,存储成本可能会成为一个问题。基于 Git 对象哈希的缓存粒度非常细,这意味着每次微小的代码改动都会生成新的缓存对象。如果不加管理,对象存储中的文件数量会爆炸式增长。必须配套实现一套严格的生命周期管理(Lifecycle Policy)策略,定期清理那些与任何活动分支都无关的旧缓存对象。

再者,对 Turbopack 内部的依赖是一个潜在风险。我们的上传脚本假设能够轻易识别出与源码哈希对应的缓存产物。如果 Turbopack 的缓存结构发生变化,这部分逻辑就需要同步更新。一个更健壮的方案可能需要 Turbopack 提供一个缓存清单(manifest)文件,明确指出输入哈希和输出缓存文件之间的映射关系。

未来的优化路径也十分清晰。当前的系统是纯粹被动的。我们可以让它变得更具预测性。例如,Coordinator 在收到一个指向 feature-A 分支的提交时,可以分析出它最有可能被合并到 develop 分支。于是,它可以主动向 Redis Stream 推送一个预热 develop 分支最新状态的缓存任务,使得开发者在创建合并请求时,对应的 CI 检查几乎可以瞬间完成。这能进一步缩短反馈循环,提升开发体验。


  目录