使用 Rust 构建 Grafana 高性能数据源后端并结合 Packer 创建不可变基础设施镜像


我们团队的内部事件总线每秒处理数十万条高度结构化的业务事件。传统的做法是将其中的关键指标导出到 Prometheus,再由 Grafana 进行可视化。这个方案在大多数场景下工作良好,但当我们需要对事件的原始高基数维度进行实时、动态的切片和聚合查询时,PromQL 的表现开始力不从心。预聚合会丢失细节,而直接查询原始数据又太慢。我们需要一个能直接与 Grafana 对话、且性能足够强悍的中间层,它能理解我们的数据结构并执行高效的即时查询。这就是我们决定用 Rust 自行构建一个 Grafana 数据源后端的起点。

这个组件的定位是基础设施的一部分,必须稳定、可靠、易于部署。因此,我们没有止步于编写一个二进制文件,而是选择用 Packer 将其与一个经过加固的操作系统一起,打包成一个不可变的亚马逊机器镜像(AMI)。这种“观测性一体机”的交付方式,极大地简化了部署和版本管理。

第一阶段:实现 Grafana 数据源后端 API

Grafana 的后端数据源本质上是一个实现了特定 HTTP API 的 Web 服务。Grafana UI 会根据用户操作向这个服务的特定端点发送请求。我们需要实现三个核心端点:

  1. GET /:健康检查,Grafana 会用它来测试数据源的可连接性。
  2. POST /search:返回可供查询的指标名称或标签。当用户在查询编辑器中点击下拉菜单时,Grafana 会调用此接口。
  3. POST /query:执行查询。这是最核心的接口,负责接收查询参数、处理数据并返回 Grafana 可视化的时序或表格数据。

技术选型上,我们选择 axum 作为 Web 框架,因为它与 tokio 生态无缝集成,类型安全且性能卓越。serde 用于处理 JSON 序列化和反序列化,tracing 用于结构化日志。

首先是项目结构和基础依赖:

# Cargo.toml
[package]
name = "grafana-custom-ds"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.6"
tokio = { version = "1.28", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json"] }
chrono = "0.4"
rand = "0.8"
once_cell = "1.18"

为了模拟真实的数据源,我们创建一个简单的内存数据库。在真实项目中,这里可能会连接到一个时序数据库、分布式缓存或直接消费 Kafka 流。为了演示,我们生成一些模拟数据。

// src/data_source.rs

use chrono::{DateTime, Utc, Duration};
use once_cell::sync::Lazy;
use rand::Rng;
use serde::Serialize;
use std::collections::HashMap;
use tokio::sync::Mutex;

#[derive(Debug, Clone, Serialize)]
pub struct TimePoint {
    pub timestamp: DateTime<Utc>,
    pub value: f64,
}

// 使用 Lazy 和 Mutex 来创建一个线程安全的全局单例模拟数据库
pub static MOCK_DB: Lazy<Mutex<HashMap<String, Vec<TimePoint>>>> = Lazy::new(|| {
    let mut db = HashMap::new();
    let mut rng = rand::thread_rng();
    let now = Utc::now();

    // 生成三个指标序列
    for metric in ["server.cpu.usage", "server.memory.bytes", "network.packets.in"] {
        let mut points = Vec::new();
        for i in 0..720 { // 模拟过去12小时的数据,每分钟一个点
            let timestamp = now - Duration::minutes(i);
            let value = match metric {
                "server.cpu.usage" => rng.gen_range(5.0..80.0),
                "server.memory.bytes" => rng.gen_range(1.0e9..16.0e9),
                _ => rng.gen_range(100.0..5000.0),
            };
            points.push(TimePoint { timestamp, value });
        }
        // 数据必须按时间戳升序排列,这在真实查询中至关重要
        points.sort_by_key(|p| p.timestamp);
        db.insert(metric.to_string(), points);
    }
    Mutex::new(db)
});

接下来是 axum 应用的骨架和 API 实现。一个常见的坑是错误处理。直接 unwrap() 或返回简单的 500 错误对调试毫无帮助。我们需要一个统一的、能转换为 HTTP 响应的错误类型。

// src/main.rs

mod data_source;

use axum::{
    routing::{get, post},
    http::StatusCode,
    response::{IntoResponse, Response, Json},
    Router,
    extract::State,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use chrono::{DateTime, Utc};
use crate::data_source::{MOCK_DB, TimePoint};

// 应用程序状态,虽然我们的DB是全局的,但这是更通用的模式
type AppState = Arc<Mutex<()>>;

// 自定义错误类型,用于优雅地处理应用内的各种错误
#[derive(Debug)]
enum AppError {
    Internal(anyhow::Error),
    BadRequest(String),
}

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, error_message) = match self {
            AppError::Internal(e) => {
                error!("Internal server error: {:?}", e);
                (StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
            }
            AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
        };
        (status, Json(json!({ "error": error_message }))).into_response()
    }
}

// 任何实现了 `std::error::Error` 的类型都可以转换为 `AppError`
impl<E> From<E> for AppError where E: std::error::Error + Send + Sync + 'static {
    fn from(err: E) -> Self {
        AppError::Internal(anyhow::Error::new(err))
    }
}


#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer().json())
        .init();

    let app_state = Arc::new(Mutex::new(()));

    let app = Router::new()
        .route("/", get(health_check))
        .route("/search", post(search))
        .route("/query", post(query))
        .with_state(app_state);

    let addr = SocketAddr::from(([0, 0, 0, 0], 3001));
    info!("Listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

// 健康检查端点
async fn health_check() -> impl IntoResponse {
    (StatusCode::OK, "OK")
}

// /search 端点实现
async fn search() -> Result<impl IntoResponse, AppError> {
    let db = MOCK_DB.lock().await;
    let metrics: Vec<&String> = db.keys().collect();
    Ok(Json(metrics))
}


// --- /query 端点的数据结构定义 ---
#[derive(Debug, Deserialize)]
struct QueryRequest {
    range: TimeRange,
    targets: Vec<Target>,
    #[serde(rename = "maxDataPoints")]
    max_data_points: Option<u32>,
}

#[derive(Debug, Deserialize)]
struct TimeRange {
    from: DateTime<Utc>,
    to: DateTime<Utc>,
}

#[derive(Debug, Deserialize)]
struct Target {
    target: String,
    #[serde(rename = "type")]
    target_type: String, // 'timeserie' or 'table'
}

// --- /query 端点的响应结构定义 ---
#[derive(Debug, Serialize)]
struct TimeSeriesResponse {
    target: String,
    datapoints: Vec<[f64; 2]>, // [value, timestamp_ms]
}


// /query 端点实现
async fn query(Json(payload): Json<QueryRequest>) -> Result<impl IntoResponse, AppError> {
    info!(payload = ?payload, "Received query request");

    let db_handle = MOCK_DB.lock().await;
    let mut responses = Vec::new();

    for target in payload.targets {
        if let Some(points) = db_handle.get(&target.target) {
            
            // 在生产环境中,这里的逻辑会复杂得多,包括降采样、聚合等
            // 为了简化,我们只做时间范围过滤
            let datapoints: Vec<[f64; 2]> = points
                .iter()
                .filter(|p| p.timestamp >= payload.range.from && p.timestamp <= payload.range.to)
                .map(|p| [p.value, p.timestamp.timestamp_millis() as f64])
                .collect();
            
            responses.push(TimeSeriesResponse {
                target: target.target.clone(),
                datapoints,
            });
        }
    }

    Ok(Json(responses))
}

现在,这个服务已经基本可用。我们可以用 cargo run 启动它,并使用 curl 进行测试:

# 测试 /search
curl -X POST http://localhost:3001/search -H "Content-Type: application/json"

# 测试 /query
curl -X POST http://localhost:3001/query -H "Content-Type: application/json" --data '{
  "range": {
    "from": "2023-10-26T00:00:00.000Z",
    "to": "2023-10-27T00:00:00.000Z"
  },
  "targets": [
    { "target": "server.cpu.usage", "type": "timeserie" }
  ],
  "maxDataPoints": 100
}'

第二阶段:用 Packer 封装为不可变镜像

一个独立的二进制文件在部署时会面临诸多问题:环境依赖、配置管理、启动脚本、权限控制等。将整个应用,包括其最小化的操作系统环境,打包成一个 AMI,可以彻底解决这些问题。每个部署都是一个全新的、从一个“黄金镜像”启动的实例,这正是不可变基础设施的核心理念。

我们使用 Packer 和 HCL 语法来定义这个镜像的构建过程。

1. 准备构建物

首先,我们需要一个静态链接的、为 Linux 构建的 Rust 二进制文件。这可以避免目标机器上缺少 glibc 版本的问题。

# 在安装了 musl 工具链的环境下编译
rustup target add x86_64-unknown-linux-musl
cargo build --release --target=x86_64-unknown-linux-musl

编译产物位于 target/x86_64-unknown-linux-musl/release/grafana-custom-ds

2. 准备 systemd 服务单元文件

我们需要让操作系统在启动时自动运行我们的服务。

# packer/grafana-custom-ds.service

[Unit]
Description=Custom Grafana Datasource Service
After=network.target

[Service]
ExecStart=/usr/local/bin/grafana-custom-ds
User=grafanads
Group=grafanads
Restart=on-failure
RestartSec=5

# 生产环境中的安全强化
# 阻止服务写入大部分文件系统
ProtectSystem=full
# 阻止服务获取更多权限
NoNewPrivileges=true
# 限制服务可以访问的设备
PrivateDevices=true
# 限制对内核 sysctl 的访问
ProtectKernelTunables=true
# 限制对内核模块的加载
ProtectKernelModules=true
# 限制控制组的访问
ProtectControlGroups=true

[Install]
WantedBy=multi-user.target

3. 编写 Packer 模板

这是核心的 Packer HCL 文件。它定义了使用哪个基础镜像、如何构建、以及如何配置。

# packer/build.pkr.hcl

packer {
  required_plugins {
    amazon = {
      version = ">= 1.0.0"
      source  = "github.com/hashicorp/amazon"
    }
  }
}

variable "aws_region" {
  type    = string
  default = "us-east-1"
}

variable "app_version" {
  type    = string
  default = "0.1.0"
}

// 查找最新的 Amazon Linux 2 AMI
source "amazon-ebs" "custom-ds" {
  ami_name      = "grafana-custom-ds-${var.app_version}-${formatdate("YYYYMMDDhhmmss", timestamp())}"
  instance_type = "t3.micro"
  region        = var.aws_region
  source_ami_filter {
    filters = {
      name                = "amzn2-ami-hvm-*-x86_64-gp2"
      root-device-type    = "ebs"
      virtualization-type = "hvm"
    }
    most_recent = true
    owners      = ["amazon"]
  }
  ssh_username = "ec2-user"
  tags = {
    Name    = "GrafanaCustomDS"
    Version = var.app_version
    Source  = "Packer"
  }
}

build {
  name    = "grafana-custom-ds"
  sources = ["source.amazon-ebs.custom-ds"]

  // 第一步:系统更新与准备
  provisioner "shell" {
    inline = [
      "sudo yum update -y",
      "sudo yum install -y ca-certificates", // 确保 TLS 证书可用
      "sudo groupadd --system grafanads",
      "sudo useradd --system --no-create-home --shell /bin/false -g grafanads grafanads"
    ]
  }

  // 第二步:上传二进制文件
  provisioner "file" {
    source      = "../target/x86_64-unknown-linux-musl/release/grafana-custom-ds"
    destination = "/tmp/grafana-custom-ds"
  }

  // 第三步:上传 systemd 服务文件
  provisioner "file" {
    source      = "./grafana-custom-ds.service"
    destination = "/tmp/grafana-custom-ds.service"
  }

  // 第四步:安装并启用服务
  provisioner "shell" {
    inline = [
      "sudo mv /tmp/grafana-custom-ds /usr/local/bin/grafana-custom-ds",
      "sudo chmod +x /usr/local/bin/grafana-custom-ds",
      "sudo chown root:root /usr/local/bin/grafana-custom-ds",
      "sudo mv /tmp/grafana-custom-ds.service /etc/systemd/system/grafana-custom-ds.service",
      "sudo chown root:root /etc/systemd/system/grafana-custom-ds.service",
      "sudo systemctl enable grafana-custom-ds.service"
    ]
  }
}

现在,在配置好 AWS 凭证后,我们可以运行 Packer 来构建 AMI 了。

packer build -var "app_version=0.1.1" packer/build.pkr.hcl

Packer 会自动启动一个 EC2 实例,在上面执行 provisioner 定义的步骤,然后基于这个实例的状态创建一个新的 AMI,最后销毁临时实例。

最终成果与架构

我们得到了一个 AMI ID。现在,任何需要这个数据源的团队,只需使用这个 AMI ID 启动一个 EC2 实例,我们的 Rust 服务就会在 3001 端口上自动运行。

在 Grafana 中,我们添加一个新的数据源,选择 “Simple JSON Datasource” 插件(它是一个通用的、实现了上述 API 规范的插件),将 URL 指向我们新实例的 http://<instance_ip>:3001

整个工作流程如下:

graph TD
    subgraph CI/CD Pipeline
        A[Code Push to Git] --> B{Build Rust Binary};
        B -- x86_64-unknown-linux-musl --> C{Run Packer Build};
        C -- Packer HCL --> D[Generates New AMI];
    end

    subgraph AWS Environment
        E[EC2 Auto Scaling Group] -- Uses AMI ID --> F[EC2 Instance];
        F -- Runs --> G[grafana-custom-ds service];
        G -- Listens on :3001 --> H[Network Load Balancer];
    end

    subgraph Observability Platform
        I[Grafana] -- Configured Datasource --> H;
        J[End User] --> I;
    end
    
    D -.-> E;

这个架构的优势是显而易见的:

  1. 高性能查询:Rust 提供了接近 C/C++ 的性能,能够对内存中的数据进行毫秒级的复杂聚合。
  2. 部署一致性:所有环境(开发、测试、生产)都使用完全相同的 AMI,消除了“在我机器上可以跑”的问题。
  3. 安全性:基础镜像定期更新,服务以非 root 用户运行,systemd 配置提供了额外的安全加固。
  4. 简化运维:升级版本就是构建一个新的 AMI,然后在启动配置中更新 AMI ID,通过滚动更新或蓝绿部署即可完成发布。无需 SSH 登录机器执行任何命令。

局限性与未来迭代方向

当前这个实现方案并非完美,它是一个起点,有几个明显的局限性需要正视:

首先,数据是纯内存的,这意味着服务是有状态的。如果实例重启,数据会丢失(在我们的模拟中会重新生成)。这限制了它的水平扩展能力。一个真正的生产级方案需要将状态外置,例如,Rust 服务可以作为一层无状态的查询代理,后端连接到一个像 ClickHouse、VictoriaMetrics 或者 Redis Streams 这样的持久化存储。

其次,当前的查询功能非常基础。一个强大的数据源应该支持 Grafana 的模板变量、注解,并能处理更复杂的查询语法,例如在 target 字符串中传递过滤条件。这需要在 /query 的解析逻辑中做大量工作。

最后,Packer 的构建过程还可以进一步优化。例如,集成安全扫描工具(如 Trivy)来扫描基础镜像和最终镜像的漏洞,并将结果作为构建流程的门禁。AMI 的分发和共享也需要通过更成熟的机制,如 AWS EC2 Image Builder 管道来管理。


  目录