利用 Debezium CDC 将 SQL Server 数据变更实时同步至 NestJS 与 Zustand 前端


接手一个项目,核心系统是跑在 SQL Server 上的老旧ERP。新的需求是在一个Web仪表盘上实时展示库存变更。最直接的想法是轮询,但每秒一次的轮询对数据库和网络都是巨大的浪费,而且无法真正做到“实时”。更麻烦的是,ERP是供应商的黑盒,我们无权修改其代码来主动推送消息。唯一的突破口就是数据库本身。

这个问题的本质,是如何在不侵入应用的前提下,捕获数据库底层的变更。方案很快锁定在变更数据捕获(Change Data Capture, CDC)上。SQL Server 从 2008 版开始就原生支持 CDC,这为我们打开了一扇门。目标是构建一个从 SQL Server 事务日志到前端 UI 的、健壮、低延迟的数据管道。

架构构想与技术选型

整个数据流的设计如下:

  1. 数据源: SQL Server 数据库。当 [dbo].[Inventory] 表发生 INSERT, UPDATE, DELETE 操作时,CDC 机制会记录这些变更。
  2. 捕获与传输: Debezium 的 SQL Server Connector 负责监控 CDC 表,将变更事件解析为结构化 JSON,并推送到 Kafka topic 中。
  3. 处理与分发: 一个 NestJS 服务作为 Kafka Consumer,订阅该 topic。它负责解析 Debezium 事件,进行必要的业务逻辑处理,然后通过 WebSocket 将精简后的数据实时推送给前端客户端。
  4. 前端展示: React + Zustand 构建的前端应用。通过一个轻量级的 WebSocket 客户端接收数据,并使用 Zustand 更新状态,驱动 UI 实时渲染变更。
  5. 部署与运维: 整个流程,从 NestJS 应用的构建、测试,到 Debezium Connector 的配置更新,全部通过 GitLab CI/CD 实现自动化。
graph TD
    subgraph "Legacy System"
        SQLServer[(SQL Server DB)] -- CDC Enabled --> CDC_Table([CDC Change Table])
    end

    subgraph "Data Pipeline"
        Debezium[Debezium Connector] -- Reads --> CDC_Table
        Debezium -- Publishes Events --> Kafka(Kafka Topic: inventory.changes)
    end

    subgraph "Real-time Service (NestJS)"
        KafkaConsumer(Kafka Consumer) -- Subscribes --> Kafka
        KafkaConsumer -- Processes --> WebSocketGateway[WebSocket Gateway]
    end

    subgraph "Web Application"
        WebSocketGateway -- Pushes Updates --> Browser[Client Browser]
        Browser -- WebSocket Connection --> ZustandStore{Zustand Store}
        ZustandStore -- Updates --> ReactUI([React UI])
    end

    GitLabCI[GitLab CI/CD] -- Manages Deployment --> Debezium
    GitLabCI -- Deploys --> KafkaConsumer

这里的技术选型考量很务实:

  • Debezium + Kafka: 业界标准的 CDC 解决方案。Debezium 提供了可靠的 Connector,Kafka 提供了削峰填谷和数据持久化的能力,即使下游服务宕机,数据也不会丢失。
  • NestJS: 基于 TypeScript,架构清晰(Modules, Controllers, Providers),内置了对 Microservices 和 WebSockets 的一流支持,非常适合构建这种目标明确的后端服务。
  • Zustand: 极其轻量,API 简洁。对于这种只需订阅数据并更新状态的场景,Redux 或 MobX 都显得过于笨重。Zustand 几乎没有学习成本和模板代码。

步骤一:配置源头 - SQL Server CDC

首先,需要一个用于测试的环境。在真实项目中,这一步需要DBA在目标数据库上操作。这里我们用 Docker Compose 模拟。

假设我们有一张库存表 Inventory

-- create_inventory_table.sql
-- 确保在一个支持CDC的数据库中执行
CREATE TABLE Inventory (
    id INT PRIMARY KEY,
    product_name NVARCHAR(255) NOT NULL,
    quantity INT,
    last_updated DATETIME DEFAULT GETDATE()
);

-- 插入一些初始数据
INSERT INTO Inventory (id, product_name, quantity) VALUES
(101, 'Product A', 100),
(102, 'Product B', 250),
(103, 'Product C', 80);

接下来,启用数据库和表的 CDC。

-- 1. 启用数据库级别的 CDC (如果尚未启用)
-- 必须确保 SQL Server Agent 正在运行
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'YourDB' AND is_cdc_enabled = 1)
BEGIN
    EXEC sys.sp_cdc_enable_db;
END
GO

-- 2. 为 Inventory 表启用 CDC
-- role_name=NULL 表示只有 db_owner 可以访问变更数据
-- @supports_net_changes=1 允许我们更高效地查询净变更
IF NOT EXISTS (SELECT 1 FROM sys.tables WHERE name = 'Inventory' AND is_tracked_by_cdc = 1)
BEGIN
    EXEC sys.sp_cdc_enable_table
        @source_schema = N'dbo',
        @source_name   = N'Inventory',
        @role_name     = NULL,
        @supports_net_changes = 1;
END
GO

执行后,SQL Server 会自动创建一系列 cdc.* 的系统表,其中 cdc.dbo_Inventory_CT 就是我们关心的变更表。

步骤二:搭建数据管道 - Debezium 与 Kafka

本地开发环境的 docker-compose.yml 是关键,它能一键启动所有依赖服务。

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.1
    hostname: connect
    container_name: connect
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      # Debezium SQL Server connector 需要的 JDBC driver
      # 需要自行下载 Microsoft JDBC Driver for SQL Server 并挂载
    volumes:
      - ./debezium/jars:/kafka/connect/debezium-connector-sqlserver

注意:你需要手动下载 SQL Server 的 JDBC 驱动 mssql-jdbc-*.jar 并放置在 ./debezium/jars 目录下,Debezium 连接器才能工作。

服务启动后,通过 Kafka Connect 的 REST API 来注册我们的 Connector。

// inventory-connector.json
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.hostname": "YOUR_SQL_SERVER_HOST", // 在真实项目中指向服务器IP,本地开发可使用 host.docker.internal
    "database.port": "1433",
    "database.user": "sa",
    "database.password": "YourStrong!Password",
    "database.dbname": "YourDB",
    "database.server.name": "sqlserver", // 逻辑服务名,会成为 Kafka topic 的前缀
    "table.include.list": "dbo.Inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.inventory",
    "snapshot.mode": "initial" // 首次启动时进行全量快照
  }
}

使用 curl 提交配置:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d @inventory-connector.json

此时,在 SQL Server 中执行任意DML操作,都应该能在 Kafka 中看到对应的消息。

UPDATE Inventory SET quantity = 90 WHERE id = 101;
DELETE FROM Inventory WHERE id = 103;

步骤三:消费与推送 - NestJS 服务

现在开始构建核心的 NestJS 应用。

1. Kafka 消费模块
我们使用 kafkajs 库。首先创建一个 KafkaModule 来管理消费者。

// src/kafka/kafka.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { EventEmitter2 } from '@nestjs/event-emitter';

@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private readonly logger = new Logger(KafkaService.name);

  constructor(private eventEmitter: EventEmitter2) {
    this.kafka = new Kafka({
      clientId: 'inventory-consumer',
      brokers: [process.env.KAFKA_BROKER || 'localhost:29092'],
    });
    this.consumer = this.kafka.consumer({ groupId: 'inventory-group' });
  }

  async onModuleInit() {
    await this.connect();
  }

  async onModuleDestroy() {
    await this.disconnect();
  }

  private async connect() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'sqlserver.dbo.Inventory', fromBeginning: false });
    this.logger.log('Kafka Consumer connected and subscribed.');

    await this.consumer.run({
      eachMessage: async (payload: EachMessagePayload) => {
        this.handleMessage(payload);
      },
    });
  }

  private handleMessage({ topic, partition, message }: EachMessagePayload) {
    if (!message.value) {
        this.logger.warn(`Received empty message from ${topic}:${partition}`);
        return;
    }
    const rawMessage = message.value.toString();
    this.logger.debug(`Received raw message: ${rawMessage}`);

    try {
        const event = JSON.parse(rawMessage);
        // Debezium 消息体结构复杂,我们需要的是 payload 里的 after 或 before
        const payload = event.payload;

        if (payload && payload.op) {
            const data = payload.op === 'd' ? payload.before : payload.after;
            if (!data) return;

            const inventoryUpdate = {
                op: payload.op, // 'c' (create), 'u' (update), 'd' (delete)
                data: {
                    id: data.id,
                    product_name: data.product_name,
                    quantity: data.quantity,
                    last_updated: data.last_updated
                }
            };

            // 使用事件发射器解耦 Kafka 消费和 WebSocket 推送
            this.eventEmitter.emit('inventory.updated', inventoryUpdate);
        }
    } catch (error) {
        this.logger.error('Failed to parse Kafka message', error);
    }
  }

  async disconnect() {
    await this.consumer.disconnect();
    this.logger.log('Kafka Consumer disconnected.');
  }
}

这里的关键是错误处理和解耦。handleMessage 解析 Debezium 的标准消息格式,提取出变更前后的数据,然后通过 NestJS 的 EventEmitterModule 发射一个内部事件。这样做的好处是,Kafka 模块只负责消费和解析,不关心数据如何被分发。

2. WebSocket 网关
接下来,创建一个 Gateway 来监听内部事件并广播给客户端。

// src/events/events.gateway.ts
import {
  WebSocketGateway,
  SubscribeMessage,
  WebSocketServer,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

@WebSocketGateway({
  cors: {
    origin: '*', // 在生产环境中应配置为你的前端域名
  },
})
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger(EventsGateway.name);

  handleConnection(client: Socket, ...args: any[]) {
    this.logger.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
  }
  
  // 监听 KafkaService 发射的事件
  @OnEvent('inventory.updated')
  handleInventoryUpdate(payload: any) {
    this.logger.log(`Broadcasting inventory update: ${JSON.stringify(payload)}`);
    // 向所有连接的客户端广播事件
    this.server.emit('inventory-update', payload);
  }

  @SubscribeMessage('messageToServer')
  handleMessage(client: Socket, text: string): void {
    // 仅用于测试双向通信
    client.emit('messageToClient', `Echo: ${text}`);
  }
}

这个 Gateway 非常纯粹,它监听 inventory.updated 事件,并通过 socket.ioserver.emit 方法将数据广播出去。

步骤四:实时渲染 - React 与 Zustand

前端部分相对简单。

1. 创建 Zustand Store

// src/stores/inventoryStore.ts
import { create } from 'zustand';

interface InventoryItem {
  id: number;
  product_name: string;
  quantity: number;
  last_updated: string;
}

interface InventoryState {
  items: Record<number, InventoryItem>;
  // 在真实项目中,这里会有一个API调用来获取初始库存列表
  setInitialItems: (items: InventoryItem[]) => void;
  processUpdate: (update: { op: 'c' | 'u' | 'd'; data: InventoryItem }) => void;
}

export const useInventoryStore = create<InventoryState>((set) => ({
  items: {},
  setInitialItems: (initialItems) => set(() => ({
    items: initialItems.reduce((acc, item) => {
      acc[item.id] = item;
      return acc;
    }, {}),
  })),
  processUpdate: (update) => set((state) => {
    const newItems = { ...state.items };
    const { op, data } = update;
    const { id } = data;

    switch (op) {
      case 'c': // create
      case 'u': // update
        newItems[id] = data;
        break;
      case 'd': // delete
        delete newItems[id];
        break;
      default:
        break;
    }
    return { items: newItems };
  }),
}));

Store 的设计思路是用一个对象 items 以 id 为键来存储库存,这样增删改查的效率都是 O(1)。processUpdate 方法根据操作类型 (op) 来更新 state。

2. React 组件与 WebSocket 连接

// src/components/InventoryDashboard.tsx
import React, { useEffect } from 'react';
import { io } from 'socket.io-client';
import { useInventoryStore } from '../stores/inventoryStore';

// 连接到 NestJS WebSocket 网关
const socket = io('http://localhost:3001'); // NestJS 服务的地址

export const InventoryDashboard = () => {
  const { items, processUpdate, setInitialItems } = useInventoryStore();

  useEffect(() => {
    // 伪造一个初始数据加载过程
    // 在真实项目中,这里应该是一个 fetch 调用
    const fetchInitialData = () => {
      const initialData = [
        { id: 101, product_name: 'Product A', quantity: 100, last_updated: new Date().toISOString() },
        { id: 102, product_name: 'Product B', quantity: 250, last_updated: new Date().toISOString() },
      ];
      setInitialItems(initialData);
    };

    fetchInitialData();

    // 监听 WebSocket 事件
    socket.on('inventory-update', (update) => {
      console.log('Received update:', update);
      processUpdate(update);
    });

    socket.on('connect', () => {
      console.log('Connected to WebSocket server.');
    });

    // 清理函数
    return () => {
      socket.off('inventory-update');
      socket.disconnect();
    };
  }, [processUpdate, setInitialItems]);

  const inventoryList = Object.values(items);

  return (
    <div>
      <h1>Real-time Inventory</h1>
      <table>
        <thead>
          <tr>
            <th>ID</th>
            <th>Product Name</th>
            <th>Quantity</th>
            <th>Last Updated</th>
          </tr>
        </thead>
        <tbody>
          {inventoryList.map((item) => (
            <tr key={item.id}>
              <td>{item.id}</td>
              <td>{item.product_name}</td>
              <td>{item.quantity}</td>
              <td>{new Date(item.last_updated).toLocaleString()}</td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
};

组件在挂载时获取初始数据,然后开始监听 inventory-update 事件。每当收到新数据,就调用 store 的 processUpdate 方法,UI 会因状态变化而自动重新渲染。

步骤五:自动化运维 - GitLab CI/CD

手动部署和配置 Debezium Connector 是不可靠且难以追踪的。必须将其纳入版本控制和 CI/CD 流程。

.gitlab-ci.yml 配置文件如下:

stages:
  - build
  - deploy

variables:
  DOCKER_IMAGE_NAME: $CI_REGISTRY_IMAGE/inventory-service:$CI_COMMIT_SHA
  KAFKA_CONNECT_URL: "http://kafka-connect.your-namespace.svc.cluster.local:8083" # Kubernetes 集群内部地址

build-service:
  stage: build
  image: docker:20.10.16
  services:
    - docker:20.10.16-dind
  script:
    - echo "Building NestJS service Docker image..."
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
    - docker build -t $DOCKER_IMAGE_NAME .
    - docker push $DOCKER_IMAGE_NAME
  only:
    - main

deploy-connector:
  stage: deploy
  image: curlimages/curl:7.83.1
  script:
    - echo "Deploying Debezium SQL Server Connector..."
    # 使用 curl 的 --data-binary 来避免 shell 对 @ 符号的特殊解释
    - |
      curl -i -X PUT "${KAFKA_CONNECT_URL}/connectors/inventory-connector/config" \
      -H "Content-Type: application/json" \
      --data-binary "@debezium-config/inventory-connector.json"
  only:
    - main
  when: manual # 手动触发,防止意外更改

deploy-service:
  stage: deploy
  image: anshulbehl/kubectl-curl
  script:
    - echo "Deploying NestJS service to Kubernetes..."
    - envsubst < k8s/deployment.template.yaml > k8s/deployment.yaml
    - kubectl apply -f k8s/deployment.yaml
  needs:
    - build-service
  only:
    - main

这里的核心思想是:

  1. NestJS 服务容器化: build-service 任务负责构建 NestJS 应用的 Docker 镜像并推送到 GitLab 的容器镜像仓库。
  2. Connector 配置即代码: debezium-config/inventory-connector.json 文件与应用代码一同存放在 Git 仓库中。deploy-connector 任务使用 curl 和 Kafka Connect 的 REST API,以声明式的方式创建或更新 Connector 配置。这比手动操作可靠得多。
  3. 服务部署: deploy-service 任务(这里以 Kubernetes 为例)使用 kubectl 将新的 Docker 镜像部署到集群。

这个 CI/CD 流程确保了整个实时数据管道的后端部分是可重复、可追踪、自动化部署的。

局限性与未来展望

这套架构虽然解决了实时性的核心问题,但在生产环境中还需考虑更多。

首先,高可用与扩展性。当前的 NestJS 服务是单点。当 WebSocket 连接数激增时,需要水平扩展。但这会带来新问题:如何确保一个 WebSocket 事件只被广播一次?通常的解决方案是引入 Redis Pub/Sub。所有 NestJS 实例都订阅 Redis 的一个 channel,由 Kafka consumer 将消息发布到 Redis,再由所有实例从 Redis 接收并推送给各自连接的客户端。

其次,初始状态加载。新的客户端连接时,它需要获取当前所有库存的全量数据,而不是只等待增量更新。这意味着 NestJS 服务除了 WebSocket Gateway 外,还需要提供一个传统的 REST API 接口(例如 GET /inventory)供前端在初始化时调用。

最后,容错与消息处理。如果 NestJS 服务在处理某条 Kafka 消息时失败怎么办?需要设计重试和死信队列(Dead-Letter Queue)机制,以防有问题的消息阻塞整个消费流程。同时,对 Debezium 的 op 类型(c, u, d, r - read for snapshot)需要做完备处理,确保前端状态的最终一致性。


  目录