手动在 Kubernetes 上部署和维护一个高可用的 Dgraph 集群是一项繁琐且极易出错的任务。集群包含两种角色的节点:Zero 和 Alpha,它们之间有启动依赖和网络通信要求。配置漂移、手动扩缩容的风险以及故障恢复的复杂性,都促使我们寻求一种声明式、自动化的解决方案。Kubernetes Operator 模式是应对这类有状态应用管理的标准答案。
然而,Operator 的核心——调谐循环(Reconciliation Loop)——本质上是一个复杂的状态机。它根据期望状态(CRD)和真实世界状态(集群中的实际资源)进行决策。任何微小的逻辑错误都可能导致集群状态异常甚至数据丢失。因此,为这个核心逻辑建立一套健壮的测试体系至关重要。本文将复盘如何采用测试驱动开发(TDD)的方式,构建一个 Dgraph Operator 的关键调谐逻辑,确保其在交付前就具备高可靠性。
技术痛点与初步构想
一个高可用的 Dgraph 集群至少需要3个 Zero 节点(负责元数据管理和事务协调)和3个 Alpha 节点(负责数据存储和查询)。Zero 节点必须先于 Alpha 节点启动并形成一个 Raft 集群。Alpha 节点启动时需要知道所有 Zero 节点的地址。
手动管理意味着:
- 分别创建 Zero 和 Alpha 的 StatefulSet。
- 确保 Zero 的 Headless Service 正确创建,以便 Alpha 节点可以发现它们。
- 在 Alpha 的启动命令中硬编码或通过复杂脚本注入 Zero 节点的地址。
- 扩缩容时,需要手动调整 StatefulSet 的副本数,并确保 Dgraph 集群内部状态正确同步。
这个过程充满了命令式的操作。我们的构想是创建一个 DgraphCluster
的 CRD,让用户只需定义期望的集群规模和版本,Operator 便会自动完成所有底层资源的创建、关联和维护。
# api/v1alpha1/dgraphcluster_types.go 中定义的期望 CRD 结构
apiVersion: database.example.com/v1alpha1
kind: DgraphCluster
metadata:
name: dgraph-prod
spec:
version: "v21.03.2"
zero:
replicas: 3
alpha:
replicas: 3
核心挑战在于 Reconcile
函数的实现。这个函数必须是幂等的,并且能够处理各种中间状态和错误情况。使用 TDD,我们可以先定义好各种场景下的期望行为,再编写代码使其通过测试,从而系统性地构建出健壮的逻辑。我们将使用 kubebuilder
脚手架和 controller-runtime
的 envtest
库,它能在本地启动一个临时的 etcd
和 kube-apiserver
实例,为我们提供一个用于测试的干净的 Kubernetes 控制平面。
TDD 周期一:确保 Zero 节点 StatefulSet 的正确创建
第一个也是最基础的功能,是当一个 DgraphCluster
资源被创建时,Operator 应该为其创建一个对应的 Zero 节点 StatefulSet。
第一步:编写失败的测试 (Red)
我们的测试用例将模拟创建一个 DgraphCluster
对象,然后验证 Kubernetes API 中是否出现了一个符合预期的 StatefulSet
。
// controllers/dgraphcluster_controller_test.go
package controllers
import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"time"
databasev1alpha1 "github.com/your-repo/dgraph-operator/api/v1alpha1"
)
var _ = Describe("DgraphCluster controller", func() {
const (
DgraphClusterName = "test-dgraphcluster"
DgraphClusterNamespace = "default"
timeout = time.Second * 10
interval = time.Millisecond * 250
)
Context("When creating a DgraphCluster", func() {
It("Should create a Zero StatefulSet", func() {
By("By creating a new DgraphCluster resource")
ctx := context.Background()
dgraphCluster := &databasev1alpha1.DgraphCluster{
TypeMeta: metav1.TypeMeta{
APIVersion: "database.example.com/v1alpha1",
Kind: "DgraphCluster",
},
ObjectMeta: metav1.ObjectMeta{
Name: DgraphClusterName,
Namespace: DgraphClusterNamespace,
},
Spec: databasev1alpha1.DgraphClusterSpec{
Version: "v21.03.2",
Zero: databasev1alpha1.ZeroSpec{
Replicas: 3,
},
Alpha: databasev1alpha1.AlphaSpec{
Replicas: 3,
},
},
}
Expect(k8sClient.Create(ctx, dgraphCluster)).Should(Succeed())
zeroStsLookupKey := types.NamespacedName{Name: DgraphClusterName + "-zero", Namespace: DgraphClusterNamespace}
createdZeroSts := &appsv1.StatefulSet{}
// 持续检查,直到 StatefulSet 被创建或超时
Eventually(func() bool {
err := k8sClient.Get(ctx, zeroStsLookupKey, createdZeroSts)
return err == nil
}, timeout, interval).Should(BeTrue())
// 断言 StatefulSet 的关键属性是否符合预期
Expect(*createdZeroSts.Spec.Replicas).Should(Equal(int32(3)))
Expect(createdZeroSts.Spec.ServiceName).Should(Equal(DgraphClusterName + "-zero-headless"))
Expect(createdZeroSts.Spec.Template.Spec.Containers[0].Image).Should(Equal("dgraph/dgraph:v21.03.2"))
Expect(createdZeroSts.Spec.Template.Spec.Containers[0].Command).Should(ContainElement("/dgraph"))
Expect(createdZeroSts.Spec.Template.Spec.Containers[0].Args).Should(ContainElement("dgraph-zero"))
// --my 参数对于 StatefulSet pod 的发现至关重要
Expect(createdZeroSts.Spec.Template.Spec.Containers[0].Args).Should(ContainElement("--my=$(hostname -f):5080"))
})
})
})
此时运行测试,它会因为找不到 StatefulSet
而失败。这是 TDD 的第一步。
第二步:编写最小化实现使其通过 (Green)
现在,我们在 Reconcile
方法中添加创建 Zero StatefulSet 的逻辑。这里的关键是幂等性:我们首先检查 StatefulSet 是否存在,只有在不存在时才创建它。
// controllers/dgraphcluster_controller.go
func (r *DgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("dgraphcluster", req.NamespacedName)
var dgraphCluster databasev1alpha1.DgraphCluster
if err := r.Get(ctx, req.NamespacedName, &dgraphCluster); err != nil {
log.Error(err, "unable to fetch DgraphCluster")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// --- 调谐 Zero StatefulSet ---
zeroSts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: dgraphCluster.Name + "-zero", Namespace: dgraphCluster.Namespace}, zeroSts)
if err != nil && errors.IsNotFound(err) {
// StatefulSet 不存在,创建它
log.Info("Creating a new Zero StatefulSet")
sts := r.statefulSetForZero(&dgraphCluster)
if err := ctrl.SetControllerReference(&dgraphCluster, sts, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for Zero StatefulSet")
return ctrl.Result{}, err
}
if err := r.Create(ctx, sts); err != nil {
log.Error(err, "Failed to create Zero StatefulSet")
return ctrl.Result{}, err
}
// 创建成功,重新排队以便后续检查状态
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Zero StatefulSet")
return ctrl.Result{}, err
}
// ... 后续逻辑 ...
return ctrl.Result{}, nil
}
// statefulSetForZero 是一个辅助函数,用于构建 Zero StatefulSet 对象
func (r *DgraphClusterReconciler) statefulSetForZero(dc *databasev1alpha1.DgraphCluster) *appsv1.StatefulSet {
// 生产级的代码需要处理好标签、端口、资源请求/限制、存储卷等
ls := labelsForDgraphCluster(dc.Name, "zero")
replicas := dc.Spec.Zero.Replicas
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: dc.Name + "-zero",
Namespace: dc.Namespace,
Labels: ls,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: dc.Name + "-zero-headless", // 需要一个对应的 Headless Service
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: ls,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "dgraph/dgraph:" + dc.Spec.Version,
Name: "dgraph-zero",
Command: []string{"/dgraph"},
Args: []string{
"dgraph-zero",
"--my=$(hostname -f):5080",
"--replicas",
"3", // 真实项目中这里应与 spec.replicas 联动
"--logtostderr",
},
Ports: []corev1.ContainerPort{
{ContainerPort: 5080, Name: "grpc-peer"},
{ContainerPort: 6080, Name: "http"},
},
}},
},
},
},
}
}
当然,为了让它完整工作,我们还需要创建对应的 Headless Service。这里为简洁起见省略了 Service 的调谐逻辑,但在真实项目中,它也应该被 TDD 覆盖。现在再次运行测试,它应该会通过。
第三步:重构 (Refactor)
代码目前还算清晰,暂时没有太多需要重构的地方。我们可以将构建 StatefulSet
的逻辑封装在独立的函数中,这已经做了。
TDD 周期二:Alpha 节点依赖 Zero 节点,确保正确启动
Alpha 节点的启动依赖于 Zero 节点集群的就绪。Operator 必须确保只有在 Zero 节点可用后,才创建 Alpha 节点的 StatefulSet,并且 Alpha 的启动参数必须正确指向 Zero 节点。
第一步:编写失败的测试 (Red)
这个测试用例更复杂。我们需要模拟 Zero StatefulSet 的状态变化。首先,我们会创建一个 DgraphCluster
,然后手动将 Zero StatefulSet 的状态更新为 “Ready”。只有在这之后,我们才期望 Alpha StatefulSet 被创建。
// controllers/dgraphcluster_controller_test.go
// ... 在同一个 Describe 块中 ...
It("Should create an Alpha StatefulSet only after Zero StatefulSet is ready", func() {
ctx := context.Background()
// 前置条件:创建 DgraphCluster 资源
dgraphCluster := &databasev1alpha1.DgraphCluster{ /* ... 同上 ... */ }
Expect(k8sClient.Create(ctx, dgraphCluster)).Should(Succeed())
zeroStsLookupKey := types.NamespacedName{Name: DgraphClusterName + "-zero", Namespace: DgraphClusterNamespace}
createdZeroSts := &appsv1.StatefulSet{}
// 等待 Zero StatefulSet 被创建
Eventually(func() bool {
err := k8sClient.Get(ctx, zeroStsLookupKey, createdZeroSts)
return err == nil
}, timeout, interval).Should(BeTrue())
alphaStsLookupKey := types.NamespacedName{Name: DgraphClusterName + "-alpha", Namespace: DgraphClusterNamespace}
createdAlphaSts := &appsv1.StatefulSet{}
// 此时,因为 Zero 还未就绪,Alpha StatefulSet 不应该被创建
Consistently(func() bool {
err := k8sClient.Get(ctx, alphaStsLookupKey, createdAlphaSts)
return errors.IsNotFound(err)
}, time.Second*2, interval).Should(BeTrue())
By("By updating the Zero StatefulSet status to ready")
// 手动模拟 Zero StatefulSet 变为 Ready 状态
createdZeroSts.Status.Replicas = dgraphCluster.Spec.Zero.Replicas
createdZeroSts.Status.ReadyReplicas = dgraphCluster.Spec.Zero.Replicas
Expect(k8sClient.Status().Update(ctx, createdZeroSts)).Should(Succeed())
// 现在,应该能观察到 Alpha StatefulSet 的创建
Eventually(func() bool {
err := k8sClient.Get(ctx, alphaStsLookupKey, createdAlphaSts)
return err == nil
}, timeout, interval).Should(BeTrue())
// 验证 Alpha StatefulSet 的关键配置,特别是 --zero 参数
Expect(*createdAlphaSts.Spec.Replicas).Should(Equal(int32(3)))
expectedZeroAddr := fmt.Sprintf("%s-zero-0.%s-zero-headless.%s.svc.cluster.local:5080", DgraphClusterName, DgraphClusterName, DgraphClusterNamespace)
Expect(createdAlphaSts.Spec.Template.Spec.Containers[0].Args).Should(ContainElement("--zero=" + expectedZeroAddr))
})
这个测试会失败,因为我们的 Reconcile
函数还没有处理 Alpha 的逻辑,也没有检查 Zero 的就绪状态。
第二步:编写实现使其通过 (Green)
我们需要扩展 Reconcile
函数。在检查完 Zero StatefulSet 后,我们检查它的 .status.readyReplicas
是否等于 .spec.replicas
。如果不满足,就重新排队等待。满足后,再继续创建 Alpha StatefulSet。
// controllers/dgraphcluster_controller.go
func (r *DgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ... 获取 dgraphCluster 对象的逻辑同上 ...
// --- 调谐 Zero StatefulSet ---
// ... 创建或获取 Zero StatefulSet 的逻辑同上 ...
// 检查 Zero StatefulSet 是否就绪
if zeroSts.Status.ReadyReplicas < dgraphCluster.Spec.Zero.Replicas {
log.Info("Zero StatefulSet is not ready yet", "ReadyReplicas", zeroSts.Status.ReadyReplicas, "TargetReplicas", dgraphCluster.Spec.Zero.Replicas)
// 关键:返回 RequeueAfter,避免在未就绪时过于频繁地重试
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
// --- 调谐 Alpha StatefulSet ---
alphaSts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: dgraphCluster.Name + "-alpha", Namespace: dgraphCluster.Namespace}, alphaSts)
if err != nil && errors.IsNotFound(err) {
log.Info("Creating a new Alpha StatefulSet")
sts := r.statefulSetForAlpha(&dgraphCluster)
if err := ctrl.SetControllerReference(&dgraphCluster, sts, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for Alpha StatefulSet")
return ctrl.Result{}, err
}
if err := r.Create(ctx, sts); err != nil {
log.Error(err, "Failed to create Alpha StatefulSet")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
} else if err != nil {
log.Error(err, "Failed to get Alpha StatefulSet")
return ctrl.Result{}, err
}
// ... 后续逻辑 ...
return ctrl.Result{}, nil
}
func (r *DgraphClusterReconciler) statefulSetForAlpha(dc *databasev1alpha1.DgraphCluster) *appsv1.StatefulSet {
ls := labelsForDgraphCluster(dc.Name, "alpha")
replicas := dc.Spec.Alpha.Replicas
// 构建 Zero 节点的发现地址。在真实项目中,这里应该构建一个完整的列表。
// Dgraph 支持提供任意一个 Zero 节点的地址,它会自动发现其他节点。
zeroHeadlessSvc := dc.Name + "-zero-headless"
zeroPeerAddr := fmt.Sprintf("%s-zero-0.%s.%s.svc.cluster.local:5080", dc.Name, zeroHeadlessSvc, dc.Namespace)
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: dc.Name + "-alpha",
Namespace: dc.Namespace,
Labels: ls,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
ServiceName: dc.Name + "-alpha-headless",
Selector: &metav1.LabelSelector{
MatchLabels: ls,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: ls,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Image: "dgraph/dgraph:" + dc.Spec.Version,
Name: "dgraph-alpha",
Command: []string{"/dgraph"},
Args: []string{
"dgraph-alpha",
"--my=$(hostname -f):7080",
"--zero=" + zeroPeerAddr, // 关键参数
"--logtostderr",
},
// ... ports, volumes etc.
}},
},
},
},
}
}
再次运行测试套件,现在两个测试都应该能通过了。我们通过测试驱动的方式,确保了组件之间的启动依赖关系。
TDD 周期三:处理扩缩容
用户可能修改 DgraphCluster
的 spec
来调整 Zero 或 Alpha 节点的数量。Operator 必须能够检测到这种变化并更新对应的 StatefulSet。
第一步:编写失败的测试 (Red)
// controllers/dgraphcluster_controller_test.go
It("Should scale the Alpha StatefulSet when DgraphCluster spec is updated", func() {
ctx := context.Background()
// 前置条件:创建一个完整的、就绪的集群
dgraphCluster := &databasev1alpha1.DgraphCluster{ /* ... spec.alpha.replicas = 3 ... */ }
Expect(k8sClient.Create(ctx, dgraphCluster)).Should(Succeed())
// 模拟调谐过程,使得 Zero 和 Alpha 的 StatefulSet 都被创建
// (此处省略了模拟就绪状态的代码,实际测试中需要)
// ...
// 等待 Alpha StatefulSet 创建并断言其副本数为 3
alphaStsLookupKey := types.NamespacedName{Name: DgraphClusterName + "-alpha", Namespace: DgraphClusterNamespace}
createdAlphaSts := &appsv1.StatefulSet{}
Eventually(func() (int32, error) {
err := k8sClient.Get(ctx, alphaStsLookupKey, createdAlphaSts)
if err != nil {
return 0, err
}
return *createdAlphaSts.Spec.Replicas, nil
}, timeout, interval).Should(Equal(int32(3)))
By("Updating the DgraphCluster spec.alpha.replicas")
updatedDgraphCluster := &databasev1alpha1.DgraphCluster{}
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: DgraphClusterName, Namespace: DgraphClusterNamespace}, updatedDgraphCluster)).Should(Succeed())
updatedDgraphCluster.Spec.Alpha.Replicas = 5
Expect(k8sClient.Update(ctx, updatedDgraphCluster)).Should(Succeed())
// 断言 Alpha StatefulSet 的副本数最终会变为 5
Eventually(func() (int32, error) {
err := k8sClient.Get(ctx, alphaStsLookupKey, createdAlphaSts)
if err != nil {
return 0, err
}
return *createdAlphaSts.Spec.Replicas, nil
}, timeout, interval).Should(Equal(int32(5)))
})
这个测试会失败,因为 Reconcile
函数在 StatefulSet 已经存在后,就直接跳过了,没有检查其 replicas
字段是否与 spec
一致。
第二步:编写实现使其通过 (Green)
我们需要在 Reconcile
函数中,当获取到已存在的 StatefulSet 后,比较其 spec
和我们期望的 spec
是否一致。
// controllers/dgraphcluster_controller.go
func (r *DgraphClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ...
// --- 调谐 Zero StatefulSet ---
// ...
// 在获取到 zeroSts 后
if *zeroSts.Spec.Replicas != dgraphCluster.Spec.Zero.Replicas {
log.Info("Zero StatefulSet replica count mismatch, updating...", "current", *zeroSts.Spec.Replicas, "desired", dgraphCluster.Spec.Zero.Replicas)
zeroSts.Spec.Replicas = &dgraphCluster.Spec.Zero.Replicas
if err := r.Update(ctx, zeroSts); err != nil {
log.Error(err, "Failed to update Zero StatefulSet replicas")
return ctrl.Result{}, err
}
// 更新后需要重新排队以观察更新状态
return ctrl.Result{Requeue: true}, nil
}
// ... 检查 Zero 是否就绪 ...
// --- 调谐 Alpha StatefulSet ---
// ...
// 在获取到 alphaSts 后
if *alphaSts.Spec.Replicas != dgraphCluster.Spec.Alpha.Replicas {
log.Info("Alpha StatefulSet replica count mismatch, updating...", "current", *alphaSts.Spec.Replicas, "desired", dgraphCluster.Spec.Alpha.Replicas)
alphaSts.Spec.Replicas = &dgraphCluster.Spec.Alpha.Replicas
if err := r.Update(ctx, alphaSts); err != nil {
log.Error(err, "Failed to update Alpha StatefulSet replicas")
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
// ... 后续逻辑 ...
return ctrl.Result{}, nil
}
现在,所有的测试都应该通过了。我们已经构建了一个能够处理创建、依赖和更新的核心调谐逻辑。
最终调谐逻辑与架构示意
经过多个 TDD 周期,我们的 Reconcile
函数形成了一个清晰的、可预测的逻辑流。
graph TD A[Start Reconcile] --> B{Fetch DgraphCluster CR}; B --> C{DgraphCluster exists?}; C -- No --> Z[End, Nothing to do]; C -- Yes --> D{Zero StatefulSet exists?}; D -- No --> E[Create Zero StatefulSet]; E --> Y[Requeue]; D -- Yes --> F{Zero Replicas match spec?}; F -- No --> G[Update Zero StatefulSet]; G --> Y; F -- Yes --> H{Zero is Ready?}; H -- No --> X[RequeueAfter 10s]; H -- Yes --> I{Alpha StatefulSet exists?}; I -- No --> J[Create Alpha StatefulSet]; J --> Y; I -- Yes --> K{Alpha Replicas match spec?}; K -- No --> L[Update Alpha StatefulSet]; L --> Y; K -- Yes --> M{Update DgraphCluster Status}; M --> Z;
这个流程图直观地展示了我们的调谐器如何一步步地将集群状态收敛到用户定义的期望状态。每一步转换都由我们的测试用例集进行了覆盖和保护。
局限性与未来迭代方向
这种 TDD 方法构建的 Operator 核心逻辑虽然健壮,但当前实现仍然存在一些生产环境的考量不足:
- 版本升级:当前实现没有处理
spec.version
变更的逻辑。一个完整的 Operator 需要实现安全的滚动升级策略,可能涉及Partitioned
滚动更新,并在升级前后执行 Dgraph 特定的健康检查。 - 配置管理:Dgraph 的配置项远不止启动参数。一个成熟的 Operator 应该通过 ConfigMap 管理复杂的配置,并能检测到 ConfigMap 变化以触发 Pod 的滚动重启。
- 细粒度状态:当前的就绪判断仅依赖
ReadyReplicas
,这并不完全可靠。Operator 应该通过调用 Dgraph 的健康检查 API(如/health
端点)来获取更精确的集群成员健康状态。 - 备份与恢复:生产级的 Operator 必须提供备份和恢复的声明式 API,例如定义一个
DgraphBackup
CRD,并由 Operator 协调执行备份任务。
尽管如此,通过 TDD 构建的这个核心骨架为添加这些高级功能打下了坚实的基础。每当需要添加新功能时,我们都可以遵循 “Red-Green-Refactor” 的循环,先定义新场景下的期望行为,然后安全地扩展调谐逻辑,确保新代码不会破坏已有的稳定性。这在处理复杂分布式系统的自动化运维逻辑时,是一种极其有效的工程实践。