我们团队的内部事件总线每秒处理数十万条高度结构化的业务事件。传统的做法是将其中的关键指标导出到 Prometheus,再由 Grafana 进行可视化。这个方案在大多数场景下工作良好,但当我们需要对事件的原始高基数维度进行实时、动态的切片和聚合查询时,PromQL 的表现开始力不从心。预聚合会丢失细节,而直接查询原始数据又太慢。我们需要一个能直接与 Grafana 对话、且性能足够强悍的中间层,它能理解我们的数据结构并执行高效的即时查询。这就是我们决定用 Rust 自行构建一个 Grafana 数据源后端的起点。
这个组件的定位是基础设施的一部分,必须稳定、可靠、易于部署。因此,我们没有止步于编写一个二进制文件,而是选择用 Packer 将其与一个经过加固的操作系统一起,打包成一个不可变的亚马逊机器镜像(AMI)。这种“观测性一体机”的交付方式,极大地简化了部署和版本管理。
第一阶段:实现 Grafana 数据源后端 API
Grafana 的后端数据源本质上是一个实现了特定 HTTP API 的 Web 服务。Grafana UI 会根据用户操作向这个服务的特定端点发送请求。我们需要实现三个核心端点:
-
GET /
:健康检查,Grafana 会用它来测试数据源的可连接性。 -
POST /search
:返回可供查询的指标名称或标签。当用户在查询编辑器中点击下拉菜单时,Grafana 会调用此接口。 -
POST /query
:执行查询。这是最核心的接口,负责接收查询参数、处理数据并返回 Grafana 可视化的时序或表格数据。
技术选型上,我们选择 axum
作为 Web 框架,因为它与 tokio
生态无缝集成,类型安全且性能卓越。serde
用于处理 JSON 序列化和反序列化,tracing
用于结构化日志。
首先是项目结构和基础依赖:
# Cargo.toml
[package]
name = "grafana-custom-ds"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.6"
tokio = { version = "1.28", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json"] }
chrono = "0.4"
rand = "0.8"
once_cell = "1.18"
为了模拟真实的数据源,我们创建一个简单的内存数据库。在真实项目中,这里可能会连接到一个时序数据库、分布式缓存或直接消费 Kafka 流。为了演示,我们生成一些模拟数据。
// src/data_source.rs
use chrono::{DateTime, Utc, Duration};
use once_cell::sync::Lazy;
use rand::Rng;
use serde::Serialize;
use std::collections::HashMap;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Serialize)]
pub struct TimePoint {
pub timestamp: DateTime<Utc>,
pub value: f64,
}
// 使用 Lazy 和 Mutex 来创建一个线程安全的全局单例模拟数据库
pub static MOCK_DB: Lazy<Mutex<HashMap<String, Vec<TimePoint>>>> = Lazy::new(|| {
let mut db = HashMap::new();
let mut rng = rand::thread_rng();
let now = Utc::now();
// 生成三个指标序列
for metric in ["server.cpu.usage", "server.memory.bytes", "network.packets.in"] {
let mut points = Vec::new();
for i in 0..720 { // 模拟过去12小时的数据,每分钟一个点
let timestamp = now - Duration::minutes(i);
let value = match metric {
"server.cpu.usage" => rng.gen_range(5.0..80.0),
"server.memory.bytes" => rng.gen_range(1.0e9..16.0e9),
_ => rng.gen_range(100.0..5000.0),
};
points.push(TimePoint { timestamp, value });
}
// 数据必须按时间戳升序排列,这在真实查询中至关重要
points.sort_by_key(|p| p.timestamp);
db.insert(metric.to_string(), points);
}
Mutex::new(db)
});
接下来是 axum
应用的骨架和 API 实现。一个常见的坑是错误处理。直接 unwrap()
或返回简单的 500
错误对调试毫无帮助。我们需要一个统一的、能转换为 HTTP 响应的错误类型。
// src/main.rs
mod data_source;
use axum::{
routing::{get, post},
http::StatusCode,
response::{IntoResponse, Response, Json},
Router,
extract::State,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use chrono::{DateTime, Utc};
use crate::data_source::{MOCK_DB, TimePoint};
// 应用程序状态,虽然我们的DB是全局的,但这是更通用的模式
type AppState = Arc<Mutex<()>>;
// 自定义错误类型,用于优雅地处理应用内的各种错误
#[derive(Debug)]
enum AppError {
Internal(anyhow::Error),
BadRequest(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, error_message) = match self {
AppError::Internal(e) => {
error!("Internal server error: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error".to_string())
}
AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg),
};
(status, Json(json!({ "error": error_message }))).into_response()
}
}
// 任何实现了 `std::error::Error` 的类型都可以转换为 `AppError`
impl<E> From<E> for AppError where E: std::error::Error + Send + Sync + 'static {
fn from(err: E) -> Self {
AppError::Internal(anyhow::Error::new(err))
}
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().json())
.init();
let app_state = Arc::new(Mutex::new(()));
let app = Router::new()
.route("/", get(health_check))
.route("/search", post(search))
.route("/query", post(query))
.with_state(app_state);
let addr = SocketAddr::from(([0, 0, 0, 0], 3001));
info!("Listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
// 健康检查端点
async fn health_check() -> impl IntoResponse {
(StatusCode::OK, "OK")
}
// /search 端点实现
async fn search() -> Result<impl IntoResponse, AppError> {
let db = MOCK_DB.lock().await;
let metrics: Vec<&String> = db.keys().collect();
Ok(Json(metrics))
}
// --- /query 端点的数据结构定义 ---
#[derive(Debug, Deserialize)]
struct QueryRequest {
range: TimeRange,
targets: Vec<Target>,
#[serde(rename = "maxDataPoints")]
max_data_points: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct TimeRange {
from: DateTime<Utc>,
to: DateTime<Utc>,
}
#[derive(Debug, Deserialize)]
struct Target {
target: String,
#[serde(rename = "type")]
target_type: String, // 'timeserie' or 'table'
}
// --- /query 端点的响应结构定义 ---
#[derive(Debug, Serialize)]
struct TimeSeriesResponse {
target: String,
datapoints: Vec<[f64; 2]>, // [value, timestamp_ms]
}
// /query 端点实现
async fn query(Json(payload): Json<QueryRequest>) -> Result<impl IntoResponse, AppError> {
info!(payload = ?payload, "Received query request");
let db_handle = MOCK_DB.lock().await;
let mut responses = Vec::new();
for target in payload.targets {
if let Some(points) = db_handle.get(&target.target) {
// 在生产环境中,这里的逻辑会复杂得多,包括降采样、聚合等
// 为了简化,我们只做时间范围过滤
let datapoints: Vec<[f64; 2]> = points
.iter()
.filter(|p| p.timestamp >= payload.range.from && p.timestamp <= payload.range.to)
.map(|p| [p.value, p.timestamp.timestamp_millis() as f64])
.collect();
responses.push(TimeSeriesResponse {
target: target.target.clone(),
datapoints,
});
}
}
Ok(Json(responses))
}
现在,这个服务已经基本可用。我们可以用 cargo run
启动它,并使用 curl
进行测试:
# 测试 /search
curl -X POST http://localhost:3001/search -H "Content-Type: application/json"
# 测试 /query
curl -X POST http://localhost:3001/query -H "Content-Type: application/json" --data '{
"range": {
"from": "2023-10-26T00:00:00.000Z",
"to": "2023-10-27T00:00:00.000Z"
},
"targets": [
{ "target": "server.cpu.usage", "type": "timeserie" }
],
"maxDataPoints": 100
}'
第二阶段:用 Packer 封装为不可变镜像
一个独立的二进制文件在部署时会面临诸多问题:环境依赖、配置管理、启动脚本、权限控制等。将整个应用,包括其最小化的操作系统环境,打包成一个 AMI,可以彻底解决这些问题。每个部署都是一个全新的、从一个“黄金镜像”启动的实例,这正是不可变基础设施的核心理念。
我们使用 Packer 和 HCL 语法来定义这个镜像的构建过程。
1. 准备构建物
首先,我们需要一个静态链接的、为 Linux 构建的 Rust 二进制文件。这可以避免目标机器上缺少 glibc
版本的问题。
# 在安装了 musl 工具链的环境下编译
rustup target add x86_64-unknown-linux-musl
cargo build --release --target=x86_64-unknown-linux-musl
编译产物位于 target/x86_64-unknown-linux-musl/release/grafana-custom-ds
。
2. 准备 systemd
服务单元文件
我们需要让操作系统在启动时自动运行我们的服务。
# packer/grafana-custom-ds.service
[Unit]
Description=Custom Grafana Datasource Service
After=network.target
[Service]
ExecStart=/usr/local/bin/grafana-custom-ds
User=grafanads
Group=grafanads
Restart=on-failure
RestartSec=5
# 生产环境中的安全强化
# 阻止服务写入大部分文件系统
ProtectSystem=full
# 阻止服务获取更多权限
NoNewPrivileges=true
# 限制服务可以访问的设备
PrivateDevices=true
# 限制对内核 sysctl 的访问
ProtectKernelTunables=true
# 限制对内核模块的加载
ProtectKernelModules=true
# 限制控制组的访问
ProtectControlGroups=true
[Install]
WantedBy=multi-user.target
3. 编写 Packer 模板
这是核心的 Packer HCL 文件。它定义了使用哪个基础镜像、如何构建、以及如何配置。
# packer/build.pkr.hcl
packer {
required_plugins {
amazon = {
version = ">= 1.0.0"
source = "github.com/hashicorp/amazon"
}
}
}
variable "aws_region" {
type = string
default = "us-east-1"
}
variable "app_version" {
type = string
default = "0.1.0"
}
// 查找最新的 Amazon Linux 2 AMI
source "amazon-ebs" "custom-ds" {
ami_name = "grafana-custom-ds-${var.app_version}-${formatdate("YYYYMMDDhhmmss", timestamp())}"
instance_type = "t3.micro"
region = var.aws_region
source_ami_filter {
filters = {
name = "amzn2-ami-hvm-*-x86_64-gp2"
root-device-type = "ebs"
virtualization-type = "hvm"
}
most_recent = true
owners = ["amazon"]
}
ssh_username = "ec2-user"
tags = {
Name = "GrafanaCustomDS"
Version = var.app_version
Source = "Packer"
}
}
build {
name = "grafana-custom-ds"
sources = ["source.amazon-ebs.custom-ds"]
// 第一步:系统更新与准备
provisioner "shell" {
inline = [
"sudo yum update -y",
"sudo yum install -y ca-certificates", // 确保 TLS 证书可用
"sudo groupadd --system grafanads",
"sudo useradd --system --no-create-home --shell /bin/false -g grafanads grafanads"
]
}
// 第二步:上传二进制文件
provisioner "file" {
source = "../target/x86_64-unknown-linux-musl/release/grafana-custom-ds"
destination = "/tmp/grafana-custom-ds"
}
// 第三步:上传 systemd 服务文件
provisioner "file" {
source = "./grafana-custom-ds.service"
destination = "/tmp/grafana-custom-ds.service"
}
// 第四步:安装并启用服务
provisioner "shell" {
inline = [
"sudo mv /tmp/grafana-custom-ds /usr/local/bin/grafana-custom-ds",
"sudo chmod +x /usr/local/bin/grafana-custom-ds",
"sudo chown root:root /usr/local/bin/grafana-custom-ds",
"sudo mv /tmp/grafana-custom-ds.service /etc/systemd/system/grafana-custom-ds.service",
"sudo chown root:root /etc/systemd/system/grafana-custom-ds.service",
"sudo systemctl enable grafana-custom-ds.service"
]
}
}
现在,在配置好 AWS 凭证后,我们可以运行 Packer 来构建 AMI 了。
packer build -var "app_version=0.1.1" packer/build.pkr.hcl
Packer 会自动启动一个 EC2 实例,在上面执行 provisioner
定义的步骤,然后基于这个实例的状态创建一个新的 AMI,最后销毁临时实例。
最终成果与架构
我们得到了一个 AMI ID。现在,任何需要这个数据源的团队,只需使用这个 AMI ID 启动一个 EC2 实例,我们的 Rust 服务就会在 3001 端口上自动运行。
在 Grafana 中,我们添加一个新的数据源,选择 “Simple JSON Datasource” 插件(它是一个通用的、实现了上述 API 规范的插件),将 URL 指向我们新实例的 http://<instance_ip>:3001
。
整个工作流程如下:
graph TD subgraph CI/CD Pipeline A[Code Push to Git] --> B{Build Rust Binary}; B -- x86_64-unknown-linux-musl --> C{Run Packer Build}; C -- Packer HCL --> D[Generates New AMI]; end subgraph AWS Environment E[EC2 Auto Scaling Group] -- Uses AMI ID --> F[EC2 Instance]; F -- Runs --> G[grafana-custom-ds service]; G -- Listens on :3001 --> H[Network Load Balancer]; end subgraph Observability Platform I[Grafana] -- Configured Datasource --> H; J[End User] --> I; end D -.-> E;
这个架构的优势是显而易见的:
- 高性能查询:Rust 提供了接近 C/C++ 的性能,能够对内存中的数据进行毫秒级的复杂聚合。
- 部署一致性:所有环境(开发、测试、生产)都使用完全相同的 AMI,消除了“在我机器上可以跑”的问题。
- 安全性:基础镜像定期更新,服务以非 root 用户运行,
systemd
配置提供了额外的安全加固。 - 简化运维:升级版本就是构建一个新的 AMI,然后在启动配置中更新 AMI ID,通过滚动更新或蓝绿部署即可完成发布。无需 SSH 登录机器执行任何命令。
局限性与未来迭代方向
当前这个实现方案并非完美,它是一个起点,有几个明显的局限性需要正视:
首先,数据是纯内存的,这意味着服务是有状态的。如果实例重启,数据会丢失(在我们的模拟中会重新生成)。这限制了它的水平扩展能力。一个真正的生产级方案需要将状态外置,例如,Rust 服务可以作为一层无状态的查询代理,后端连接到一个像 ClickHouse、VictoriaMetrics 或者 Redis Streams 这样的持久化存储。
其次,当前的查询功能非常基础。一个强大的数据源应该支持 Grafana 的模板变量、注解,并能处理更复杂的查询语法,例如在 target
字符串中传递过滤条件。这需要在 /query
的解析逻辑中做大量工作。
最后,Packer 的构建过程还可以进一步优化。例如,集成安全扫描工具(如 Trivy)来扫描基础镜像和最终镜像的漏洞,并将结果作为构建流程的门禁。AMI 的分发和共享也需要通过更成熟的机制,如 AWS EC2 Image Builder 管道来管理。