接手一个项目,核心系统是跑在 SQL Server 上的老旧ERP。新的需求是在一个Web仪表盘上实时展示库存变更。最直接的想法是轮询,但每秒一次的轮询对数据库和网络都是巨大的浪费,而且无法真正做到“实时”。更麻烦的是,ERP是供应商的黑盒,我们无权修改其代码来主动推送消息。唯一的突破口就是数据库本身。
这个问题的本质,是如何在不侵入应用的前提下,捕获数据库底层的变更。方案很快锁定在变更数据捕获(Change Data Capture, CDC)上。SQL Server 从 2008 版开始就原生支持 CDC,这为我们打开了一扇门。目标是构建一个从 SQL Server 事务日志到前端 UI 的、健壮、低延迟的数据管道。
架构构想与技术选型
整个数据流的设计如下:
- 数据源: SQL Server 数据库。当
[dbo].[Inventory]
表发生INSERT
,UPDATE
,DELETE
操作时,CDC 机制会记录这些变更。 - 捕获与传输: Debezium 的 SQL Server Connector 负责监控 CDC 表,将变更事件解析为结构化 JSON,并推送到 Kafka topic 中。
- 处理与分发: 一个 NestJS 服务作为 Kafka Consumer,订阅该 topic。它负责解析 Debezium 事件,进行必要的业务逻辑处理,然后通过 WebSocket 将精简后的数据实时推送给前端客户端。
- 前端展示: React + Zustand 构建的前端应用。通过一个轻量级的 WebSocket 客户端接收数据,并使用 Zustand 更新状态,驱动 UI 实时渲染变更。
- 部署与运维: 整个流程,从 NestJS 应用的构建、测试,到 Debezium Connector 的配置更新,全部通过 GitLab CI/CD 实现自动化。
graph TD subgraph "Legacy System" SQLServer[(SQL Server DB)] -- CDC Enabled --> CDC_Table([CDC Change Table]) end subgraph "Data Pipeline" Debezium[Debezium Connector] -- Reads --> CDC_Table Debezium -- Publishes Events --> Kafka(Kafka Topic: inventory.changes) end subgraph "Real-time Service (NestJS)" KafkaConsumer(Kafka Consumer) -- Subscribes --> Kafka KafkaConsumer -- Processes --> WebSocketGateway[WebSocket Gateway] end subgraph "Web Application" WebSocketGateway -- Pushes Updates --> Browser[Client Browser] Browser -- WebSocket Connection --> ZustandStore{Zustand Store} ZustandStore -- Updates --> ReactUI([React UI]) end GitLabCI[GitLab CI/CD] -- Manages Deployment --> Debezium GitLabCI -- Deploys --> KafkaConsumer
这里的技术选型考量很务实:
- Debezium + Kafka: 业界标准的 CDC 解决方案。Debezium 提供了可靠的 Connector,Kafka 提供了削峰填谷和数据持久化的能力,即使下游服务宕机,数据也不会丢失。
- NestJS: 基于 TypeScript,架构清晰(Modules, Controllers, Providers),内置了对 Microservices 和 WebSockets 的一流支持,非常适合构建这种目标明确的后端服务。
- Zustand: 极其轻量,API 简洁。对于这种只需订阅数据并更新状态的场景,Redux 或 MobX 都显得过于笨重。Zustand 几乎没有学习成本和模板代码。
步骤一:配置源头 - SQL Server CDC
首先,需要一个用于测试的环境。在真实项目中,这一步需要DBA在目标数据库上操作。这里我们用 Docker Compose 模拟。
假设我们有一张库存表 Inventory
。
-- create_inventory_table.sql
-- 确保在一个支持CDC的数据库中执行
CREATE TABLE Inventory (
id INT PRIMARY KEY,
product_name NVARCHAR(255) NOT NULL,
quantity INT,
last_updated DATETIME DEFAULT GETDATE()
);
-- 插入一些初始数据
INSERT INTO Inventory (id, product_name, quantity) VALUES
(101, 'Product A', 100),
(102, 'Product B', 250),
(103, 'Product C', 80);
接下来,启用数据库和表的 CDC。
-- 1. 启用数据库级别的 CDC (如果尚未启用)
-- 必须确保 SQL Server Agent 正在运行
IF NOT EXISTS (SELECT 1 FROM sys.databases WHERE name = 'YourDB' AND is_cdc_enabled = 1)
BEGIN
EXEC sys.sp_cdc_enable_db;
END
GO
-- 2. 为 Inventory 表启用 CDC
-- role_name=NULL 表示只有 db_owner 可以访问变更数据
-- @supports_net_changes=1 允许我们更高效地查询净变更
IF NOT EXISTS (SELECT 1 FROM sys.tables WHERE name = 'Inventory' AND is_tracked_by_cdc = 1)
BEGIN
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Inventory',
@role_name = NULL,
@supports_net_changes = 1;
END
GO
执行后,SQL Server 会自动创建一系列 cdc.*
的系统表,其中 cdc.dbo_Inventory_CT
就是我们关心的变更表。
步骤二:搭建数据管道 - Debezium 与 Kafka
本地开发环境的 docker-compose.yml
是关键,它能一键启动所有依赖服务。
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.1
hostname: connect
container_name: connect
depends_on:
- kafka
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
# Debezium SQL Server connector 需要的 JDBC driver
# 需要自行下载 Microsoft JDBC Driver for SQL Server 并挂载
volumes:
- ./debezium/jars:/kafka/connect/debezium-connector-sqlserver
注意:你需要手动下载 SQL Server 的 JDBC 驱动 mssql-jdbc-*.jar
并放置在 ./debezium/jars
目录下,Debezium 连接器才能工作。
服务启动后,通过 Kafka Connect 的 REST API 来注册我们的 Connector。
// inventory-connector.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "YOUR_SQL_SERVER_HOST", // 在真实项目中指向服务器IP,本地开发可使用 host.docker.internal
"database.port": "1433",
"database.user": "sa",
"database.password": "YourStrong!Password",
"database.dbname": "YourDB",
"database.server.name": "sqlserver", // 逻辑服务名,会成为 Kafka topic 的前缀
"table.include.list": "dbo.Inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"snapshot.mode": "initial" // 首次启动时进行全量快照
}
}
使用 curl
提交配置:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d @inventory-connector.json
此时,在 SQL Server 中执行任意DML操作,都应该能在 Kafka 中看到对应的消息。
UPDATE Inventory SET quantity = 90 WHERE id = 101;
DELETE FROM Inventory WHERE id = 103;
步骤三:消费与推送 - NestJS 服务
现在开始构建核心的 NestJS 应用。
1. Kafka 消费模块
我们使用 kafkajs
库。首先创建一个 KafkaModule
来管理消费者。
// src/kafka/kafka.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private readonly kafka: Kafka;
private readonly consumer: Consumer;
private readonly logger = new Logger(KafkaService.name);
constructor(private eventEmitter: EventEmitter2) {
this.kafka = new Kafka({
clientId: 'inventory-consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:29092'],
});
this.consumer = this.kafka.consumer({ groupId: 'inventory-group' });
}
async onModuleInit() {
await this.connect();
}
async onModuleDestroy() {
await this.disconnect();
}
private async connect() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'sqlserver.dbo.Inventory', fromBeginning: false });
this.logger.log('Kafka Consumer connected and subscribed.');
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
this.handleMessage(payload);
},
});
}
private handleMessage({ topic, partition, message }: EachMessagePayload) {
if (!message.value) {
this.logger.warn(`Received empty message from ${topic}:${partition}`);
return;
}
const rawMessage = message.value.toString();
this.logger.debug(`Received raw message: ${rawMessage}`);
try {
const event = JSON.parse(rawMessage);
// Debezium 消息体结构复杂,我们需要的是 payload 里的 after 或 before
const payload = event.payload;
if (payload && payload.op) {
const data = payload.op === 'd' ? payload.before : payload.after;
if (!data) return;
const inventoryUpdate = {
op: payload.op, // 'c' (create), 'u' (update), 'd' (delete)
data: {
id: data.id,
product_name: data.product_name,
quantity: data.quantity,
last_updated: data.last_updated
}
};
// 使用事件发射器解耦 Kafka 消费和 WebSocket 推送
this.eventEmitter.emit('inventory.updated', inventoryUpdate);
}
} catch (error) {
this.logger.error('Failed to parse Kafka message', error);
}
}
async disconnect() {
await this.consumer.disconnect();
this.logger.log('Kafka Consumer disconnected.');
}
}
这里的关键是错误处理和解耦。handleMessage
解析 Debezium 的标准消息格式,提取出变更前后的数据,然后通过 NestJS 的 EventEmitterModule
发射一个内部事件。这样做的好处是,Kafka 模块只负责消费和解析,不关心数据如何被分发。
2. WebSocket 网关
接下来,创建一个 Gateway 来监听内部事件并广播给客户端。
// src/events/events.gateway.ts
import {
WebSocketGateway,
SubscribeMessage,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
@WebSocketGateway({
cors: {
origin: '*', // 在生产环境中应配置为你的前端域名
},
})
export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger(EventsGateway.name);
handleConnection(client: Socket, ...args: any[]) {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket) {
this.logger.log(`Client disconnected: ${client.id}`);
}
// 监听 KafkaService 发射的事件
@OnEvent('inventory.updated')
handleInventoryUpdate(payload: any) {
this.logger.log(`Broadcasting inventory update: ${JSON.stringify(payload)}`);
// 向所有连接的客户端广播事件
this.server.emit('inventory-update', payload);
}
@SubscribeMessage('messageToServer')
handleMessage(client: Socket, text: string): void {
// 仅用于测试双向通信
client.emit('messageToClient', `Echo: ${text}`);
}
}
这个 Gateway 非常纯粹,它监听 inventory.updated
事件,并通过 socket.io
的 server.emit
方法将数据广播出去。
步骤四:实时渲染 - React 与 Zustand
前端部分相对简单。
1. 创建 Zustand Store
// src/stores/inventoryStore.ts
import { create } from 'zustand';
interface InventoryItem {
id: number;
product_name: string;
quantity: number;
last_updated: string;
}
interface InventoryState {
items: Record<number, InventoryItem>;
// 在真实项目中,这里会有一个API调用来获取初始库存列表
setInitialItems: (items: InventoryItem[]) => void;
processUpdate: (update: { op: 'c' | 'u' | 'd'; data: InventoryItem }) => void;
}
export const useInventoryStore = create<InventoryState>((set) => ({
items: {},
setInitialItems: (initialItems) => set(() => ({
items: initialItems.reduce((acc, item) => {
acc[item.id] = item;
return acc;
}, {}),
})),
processUpdate: (update) => set((state) => {
const newItems = { ...state.items };
const { op, data } = update;
const { id } = data;
switch (op) {
case 'c': // create
case 'u': // update
newItems[id] = data;
break;
case 'd': // delete
delete newItems[id];
break;
default:
break;
}
return { items: newItems };
}),
}));
Store 的设计思路是用一个对象 items
以 id 为键来存储库存,这样增删改查的效率都是 O(1)。processUpdate
方法根据操作类型 (op
) 来更新 state。
2. React 组件与 WebSocket 连接
// src/components/InventoryDashboard.tsx
import React, { useEffect } from 'react';
import { io } from 'socket.io-client';
import { useInventoryStore } from '../stores/inventoryStore';
// 连接到 NestJS WebSocket 网关
const socket = io('http://localhost:3001'); // NestJS 服务的地址
export const InventoryDashboard = () => {
const { items, processUpdate, setInitialItems } = useInventoryStore();
useEffect(() => {
// 伪造一个初始数据加载过程
// 在真实项目中,这里应该是一个 fetch 调用
const fetchInitialData = () => {
const initialData = [
{ id: 101, product_name: 'Product A', quantity: 100, last_updated: new Date().toISOString() },
{ id: 102, product_name: 'Product B', quantity: 250, last_updated: new Date().toISOString() },
];
setInitialItems(initialData);
};
fetchInitialData();
// 监听 WebSocket 事件
socket.on('inventory-update', (update) => {
console.log('Received update:', update);
processUpdate(update);
});
socket.on('connect', () => {
console.log('Connected to WebSocket server.');
});
// 清理函数
return () => {
socket.off('inventory-update');
socket.disconnect();
};
}, [processUpdate, setInitialItems]);
const inventoryList = Object.values(items);
return (
<div>
<h1>Real-time Inventory</h1>
<table>
<thead>
<tr>
<th>ID</th>
<th>Product Name</th>
<th>Quantity</th>
<th>Last Updated</th>
</tr>
</thead>
<tbody>
{inventoryList.map((item) => (
<tr key={item.id}>
<td>{item.id}</td>
<td>{item.product_name}</td>
<td>{item.quantity}</td>
<td>{new Date(item.last_updated).toLocaleString()}</td>
</tr>
))}
</tbody>
</table>
</div>
);
};
组件在挂载时获取初始数据,然后开始监听 inventory-update
事件。每当收到新数据,就调用 store 的 processUpdate
方法,UI 会因状态变化而自动重新渲染。
步骤五:自动化运维 - GitLab CI/CD
手动部署和配置 Debezium Connector 是不可靠且难以追踪的。必须将其纳入版本控制和 CI/CD 流程。
.gitlab-ci.yml
配置文件如下:
stages:
- build
- deploy
variables:
DOCKER_IMAGE_NAME: $CI_REGISTRY_IMAGE/inventory-service:$CI_COMMIT_SHA
KAFKA_CONNECT_URL: "http://kafka-connect.your-namespace.svc.cluster.local:8083" # Kubernetes 集群内部地址
build-service:
stage: build
image: docker:20.10.16
services:
- docker:20.10.16-dind
script:
- echo "Building NestJS service Docker image..."
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker build -t $DOCKER_IMAGE_NAME .
- docker push $DOCKER_IMAGE_NAME
only:
- main
deploy-connector:
stage: deploy
image: curlimages/curl:7.83.1
script:
- echo "Deploying Debezium SQL Server Connector..."
# 使用 curl 的 --data-binary 来避免 shell 对 @ 符号的特殊解释
- |
curl -i -X PUT "${KAFKA_CONNECT_URL}/connectors/inventory-connector/config" \
-H "Content-Type: application/json" \
--data-binary "@debezium-config/inventory-connector.json"
only:
- main
when: manual # 手动触发,防止意外更改
deploy-service:
stage: deploy
image: anshulbehl/kubectl-curl
script:
- echo "Deploying NestJS service to Kubernetes..."
- envsubst < k8s/deployment.template.yaml > k8s/deployment.yaml
- kubectl apply -f k8s/deployment.yaml
needs:
- build-service
only:
- main
这里的核心思想是:
- NestJS 服务容器化:
build-service
任务负责构建 NestJS 应用的 Docker 镜像并推送到 GitLab 的容器镜像仓库。 - Connector 配置即代码:
debezium-config/inventory-connector.json
文件与应用代码一同存放在 Git 仓库中。deploy-connector
任务使用curl
和 Kafka Connect 的 REST API,以声明式的方式创建或更新 Connector 配置。这比手动操作可靠得多。 - 服务部署:
deploy-service
任务(这里以 Kubernetes 为例)使用kubectl
将新的 Docker 镜像部署到集群。
这个 CI/CD 流程确保了整个实时数据管道的后端部分是可重复、可追踪、自动化部署的。
局限性与未来展望
这套架构虽然解决了实时性的核心问题,但在生产环境中还需考虑更多。
首先,高可用与扩展性。当前的 NestJS 服务是单点。当 WebSocket 连接数激增时,需要水平扩展。但这会带来新问题:如何确保一个 WebSocket 事件只被广播一次?通常的解决方案是引入 Redis Pub/Sub。所有 NestJS 实例都订阅 Redis 的一个 channel,由 Kafka consumer 将消息发布到 Redis,再由所有实例从 Redis 接收并推送给各自连接的客户端。
其次,初始状态加载。新的客户端连接时,它需要获取当前所有库存的全量数据,而不是只等待增量更新。这意味着 NestJS 服务除了 WebSocket Gateway 外,还需要提供一个传统的 REST API 接口(例如 GET /inventory
)供前端在初始化时调用。
最后,容错与消息处理。如果 NestJS 服务在处理某条 Kafka 消息时失败怎么办?需要设计重试和死信队列(Dead-Letter Queue)机制,以防有问题的消息阻塞整个消费流程。同时,对 Debezium 的 op
类型(c
, u
, d
, r
- read for snapshot)需要做完备处理,确保前端状态的最终一致性。