我们面临一个典型的存量系统现代化改造难题。核心业务数据稳固地运行在 PostgreSQL 之上,数据访问层由 MyBatis 全权接管,这套组合在多年的事务处理中表现得稳定可靠。但新的需求来了:业务方希望在用户访问前端页面时,能根据用户的实时行为(例如,最近30秒内的点击、浏览)动态推荐内容。传统的批处理 ETL 每天更新一次用户画像,其延迟是小时乃至天级别的,完全无法满足“实时”这个核心要求。前端即便使用了服务端渲染(SSR),在呈现动态内容时, hydration 带来的交互延迟也让个性化体验大打折扣。
目标很明确:构建一个从数据产生到前端呈现,端到端延迟在百毫秒级别的实时特征管道。这意味着需要一条从后端数据库变更,到中间层特征计算,再到前端无缝渲染的完整、低延迟链路。这是一个整合了数据工程、后端服务和前沿前端技术的综合性挑战。
架构构想与技术选型决策
直接轮询数据库是首先被否决的方案,它会对主库造成巨大压力且无法做到真正的实时。我们的起点必须是数据库的事务日志。
数据源端: 现有系统使用 MyBatis 操作 PostgreSQL。这是我们必须接受的既定事实,也是改造的起点。我们将利用 PostgreSQL 的逻辑解码(Logical Decoding)功能,这是实现变更数据捕获(CDC)的基础。
变更数据捕获 (CDC): Debezium 是这个领域的标准答案。通过部署一个 Debezium 的 PostgreSQL 连接器到 Kafka Connect 集群,我们可以将数据库
user_activity
表的INSERT
,UPDATE
,DELETE
操作以结构化事件的形式实时推送到 Kafka topic 中。这是一种对源数据库非侵入式的方案。实时特征计算层: 消费 Kafka 事件并计算特征的服务是整个管道的核心。考虑到团队对 JVM 技术栈的熟悉度,我们选择使用 Java 和 Spring Boot 构建此服务。特征的计算逻辑可能很简单(如记录用户最后点击的商品类目),也可能很复杂(如计算用户近期行为的向量嵌入)。计算出的特征需要一个低延迟的存储。
特征存储: 特征数据要求极高的读取性能。我们采用两级缓存策略:
- L1 缓存: 使用 Caffeine 作为进程内缓存,存储最热用户的特征。这能提供微秒级的访问速度。
- L2 缓存: 使用 Redis 作为分布式缓存,为 L1 缓存提供穿透保护,并保证特征计算服务的多个实例之间数据的一致性。
特征服务API: 一个简单的、基于 Spring WebFlux 的非阻塞 RESTful API,用于向前端提供用户特征。例如,
GET /api/v1/features/user/{userId}
。非阻塞 I/O 对维持低延迟至关重要。前端框架: 这是消除最后一道延迟鸿沟的关键。传统的 React/Vue/Angular 应用,即使经过 SSR,也需要在客户端执行 hydration 过程,这个过程会阻塞主线程,延迟用户可交互时间(TTI)。对于需要即时响应的个性化内容,这种延迟是不可接受的。Qwik 的 Resumability(可恢复性)从根本上解决了这个问题。它在服务端生成 HTML 的同时,将应用状态和事件监听器信息序列化并内联到 HTML 中。客户端无需重新执行任何 JavaScript 代码即可响应用户交互,实现了真正的零 hydration 和即时交互。
整个架构的数据流图如下:
graph TD subgraph "Legacy System" A[User Action] --> B{Application Logic}; B --> C[MyBatis Mapper]; C --> D[(PostgreSQL)]; end subgraph "Real-time Feature Pipeline" D -- WAL --> E[Debezium Connector]; E -- CDC Events --> F[(Kafka Topic)]; F --> G[Feature Engineering Service]; G -- Computes & Stores --> H{Cache}; H -- L1 --> I[Caffeine]; H -- L2 --> J[Redis]; end subgraph "Serving & Frontend" K[Qwik Frontend] -- HTTP Request --> L[Feature Serving API]; L -- Reads from --> H; L -- Feature Response --> K; end style D fill:#d3e5ef,stroke:#333,stroke-width:2px style F fill:#fce8b2,stroke:#333,stroke-width:2px style J fill:#ffcdd2,stroke:#333,stroke-width:2px
实现:从数据库到 Kafka 的实时数据流
1. 数据库与 MyBatis 配置
首先,确保 PostgreSQL 开启了逻辑复制。在 postgresql.conf
中进行配置:
# postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
假设我们有一个记录用户活动的数据表:
CREATE TABLE user_activity (
id UUID PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
activity_type VARCHAR(50) NOT NULL,
product_id VARCHAR(255),
activity_timestamp TIMESTAMPTZ NOT NULL
);
-- 为逻辑复制创建发布
CREATE PUBLICATION dbz_publication FOR TABLE user_activity;
在遗留系统中,通过 MyBatis 插入数据是常规操作。
<!-- UserActivityMapper.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.legacy.mapper.UserActivityMapper">
<insert id="insertActivity" parameterType="com.example.legacy.model.UserActivity">
INSERT INTO user_activity (id, user_id, activity_type, product_id, activity_timestamp)
VALUES (#{id}, #{userId}, #{activityType}, #{productId}, #{activityTimestamp})
</insert>
</mapper>
2. 配置 Debezium 连接器
我们使用 Kafka Connect 来运行 Debezium。以下是向 Kafka Connect REST API 提交的连接器配置 pg-connector.json
。在真实项目中,这个配置应该通过 IaC 工具(如 Terraform)进行管理。
{
"name": "user-activity-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "debezium_password",
"database.dbname": "app_db",
"database.server.name": "app_server_1",
"table.include.list": "public.user_activity",
"publication.name": "dbz_publication",
"publication.autocreate.mode": "disabled",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"topic.prefix": "cdc",
"slot.name": "debezium_user_activity_slot",
"slot.drop.on.stop": "false"
}
}
关键配置解析:
-
plugin.name
:pgoutput
是 PostgreSQL 10+ 内置的逻辑解码插件,性能优异。 -
table.include.list
: 明确指定我们只关心user_activity
这张表的数据变更。 -
publication.name
: 必须与数据库中创建的PUBLICATION
名称一致。 -
value.converter.schemas.enable
: 设置为false
可以得到纯净的 JSON 数据,而不是带有 schema 信息的复杂结构,简化下游消费。 -
slot.drop.on.stop
: 设置为false
至关重要。这意味着即使连接器停止,复制槽也会保留,数据库会为该槽保留 WAL 日志,确保连接器重启后不会丢失任何数据。
当 MyBatis 插入一条新记录时,几毫秒内,我们就能在 Kafka 的 cdc.app_server_1.public.user_activity
topic 中看到一条类似这样的 JSON 消息:
{
"payload": {
"before": null,
"after": {
"id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"user_id": "user-123",
"activity_type": "VIEW_PRODUCT",
"product_id": "prod-456",
"activity_timestamp": 1678886400000
},
"source": { ... },
"op": "c", // c for create/insert
"ts_ms": 1678886400123
}
}
实现:特征计算服务与低延迟存储
这个 Spring Boot 服务是管道的中枢。
1. 项目依赖 (pom.xml)
<dependencies>
<!-- Spring Boot WebFlux for non-blocking API -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Kafka Consumer -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Redis with Lettuce (reactive driver) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- Caffeine Cache -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
2. Kafka 消费者与特征处理
我们定义一个服务来监听 Kafka topic 并更新特征缓存。
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class UserActivityConsumer {
private static final Logger log = LoggerFactory.getLogger(UserActivityConsumer.class);
private final ObjectMapper objectMapper;
private final UserFeatureService userFeatureService;
public UserActivityConsumer(ObjectMapper objectMapper, UserFeatureService userFeatureService) {
this.objectMapper = objectMapper;
this.userFeatureService = userFeatureService;
}
@KafkaListener(topics = "cdc.app_server_1.public.user_activity", groupId = "feature-engineering-group")
public void consume(String message) {
try {
JsonNode root = objectMapper.readTree(message);
// Debezium event can be null for deletions or heartbeat messages
JsonNode payload = root.get("payload");
if (payload == null || payload.isNull()) {
log.info("Received message with null payload, possibly a tombstone or heartbeat. Skipping.");
return;
}
// We only care about create ('c') and update ('u') operations
String op = payload.get("op").asText();
if (!"c".equals(op) && !"u".equals(op)) {
return;
}
JsonNode after = payload.get("after");
if (after == null || after.isNull()) {
return;
}
String userId = after.get("user_id").asText();
String activityType = after.get("activity_type").asText();
String productId = after.has("product_id") ? after.get("product_id").asText(null) : null;
// This is where feature engineering happens.
// For this example, we'll just update the user's last seen product.
if ("VIEW_PRODUCT".equals(activityType) && productId != null) {
userFeatureService.updateLastViewedProduct(userId, productId).subscribe();
}
} catch (IOException e) {
log.error("Failed to parse Debezium CDC event: {}", message, e);
// In a production system, push this to a dead-letter queue (DLQ)
}
}
}
错误处理: try-catch
块是生产级代码的最低要求。任何解析失败的消息都应被记录并发送到DLQ,以防消费者循环崩溃。同时,需要处理 Debezium 的心跳消息或删除操作产生的 tombstone 消息,它们的 payload
可能为 null
。
3. 特征服务与缓存层
UserFeatureService
负责与 Caffeine 和 Redis 交互。
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Service
public class UserFeatureService {
private final ReactiveStringRedisTemplate redisTemplate;
// L1 Cache: In-memory cache for ultra-fast access
private final AsyncLoadingCache<String, String> lastViewedProductCache;
public UserFeatureService(ReactiveStringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// Build the Caffeine cache
this.lastViewedProductCache = Caffeine.newBuilder()
.maximumSize(100_000) // Max 100k users in memory
.expireAfterWrite(Duration.ofMinutes(5)) // Evict after 5 minutes of no writes
.buildAsync(this::loadFromRedis); // Cache loader for cache misses
}
// This method is called by the Kafka consumer
public Mono<Void> updateLastViewedProduct(String userId, String productId) {
// Update both L1 and L2 caches
lastViewedProductCache.put(userId, Mono.just(productId).toFuture());
String redisKey = "user_feature:last_viewed:" + userId;
return redisTemplate.opsForValue()
.set(redisKey, productId, Duration.ofHours(1)) // Set Redis TTL
.then();
}
public Mono<String> getLastViewedProduct(String userId) {
// This will first check Caffeine, if not found, it will call loadFromRedis
return Mono.fromFuture(lastViewedProductCache.get(userId));
}
// Cache loader function, called by Caffeine on a cache miss
private Mono<String> loadFromRedis(String userId) {
String redisKey = "user_feature:last_viewed:" + userId;
// fallbackTo() is used to return a default/empty value if not in Redis either
return redisTemplate.opsForValue().get(redisKey).defaultIfEmpty("N/A");
}
}
设计考量:
-
AsyncLoadingCache
是 Caffeine 的异步版本,与 WebFlux 的响应式编程模型完美契合。 - 当
getLastViewedProduct
被调用时,如果 L1 缓存命中,则立即返回。如果未命中,Caffeine 会自动调用loadFromRedis
方法从 L2 缓存加载数据,填充 L1 后再返回。 - 更新操作
updateLastViewedProduct
会同时写入 L1 和 L2,以保证数据一致性。这是一个简化的write-through
策略。
4. 特征服务 API
最后,一个简单的 WebFlux Controller 暴露这些特征。
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.Map;
@RestController
@RequestMapping("/api/v1/features")
public class FeatureController {
private final UserFeatureService featureService;
public FeatureController(UserFeatureService featureService) {
this.featureService = featureService;
}
@GetMapping("/user/{userId}")
public Mono<Map<String, String>> getUserFeatures(@PathVariable String userId) {
// In a real app, you'd fetch multiple features concurrently
return featureService.getLastViewedProduct(userId)
.map(lastViewed -> Map.of("lastViewedProduct", lastViewed));
}
}
实现:Qwik 前端即时消费
现在来看前端如何消费这些实时特征并做到即时交互。
1. Qwik 项目设置与数据获取
在 Qwik 中,数据获取的首选方式是使用 routeLoader$
。它在服务端执行,并将数据序列化传递给客户端,供组件使用。
// src/routes/index.tsx
import { component$ } from '@builder.io/qwik';
import { routeLoader$ } from '@builder.io/qwik-city';
// Define the type for our feature data
export interface UserFeatures {
lastViewedProduct: string;
}
// routeLoader$ runs on the server during SSR
export const useUserFeatures = routeLoader$<UserFeatures>(async (requestEv) => {
// In a real app, userId would come from a cookie or session
const userId = 'user-123';
const featureApiUrl = `http://feature-service:8080/api/v1/features/user/${userId}`;
try {
const response = await fetch(featureApiUrl);
if (!response.ok) {
// Proper error handling is crucial.
// Log the error on the server and return a default state.
console.error(`Failed to fetch features: ${response.statusText}`);
return { lastViewedProduct: 'default-recommendation' };
}
const data = await response.json();
return data as UserFeatures;
} catch (error) {
console.error(`Network error fetching features:`, error);
return { lastViewedProduct: 'default-recommendation' };
}
});
export default component$(() => {
const features = useUserFeatures();
return (
<div>
<h1>Welcome, user-123!</h1>
<section>
<h2>Based on your recent activity, you might like:</h2>
<div class="recommendation-card">
{/*
The value is available immediately on render.
No client-side fetch, no waiting, no hydration.
*/}
<p>Product ID: {features.value.lastViewedProduct}</p>
</div>
</section>
</div>
);
});
关键优势:
-
fetch
调用在服务端完成。客户端收到的 HTML 已经包含了lastViewedProduct
的值。 - 浏览器渲染此页面时,
recommendation-card
会立刻显示出个性化内容。用户看到页面的第一帧就是最终状态。没有内容跳变,也没有因数据加载而出现的骨架屏。
2. 客户端的交互式更新
如果用户在页面上执行了某个操作,需要刷新推荐内容,我们可以使用 useResource$
。
// src/routes/index.tsx (continued)
import { component$, useResource$, Resource } from '@builder.io/qwik';
// ... (imports and routeLoader$ from before) ...
// A function to re-fetch features from the client-side
async function fetchFeaturesClientSide(userId: string): Promise<UserFeatures> {
const response = await fetch(`/api/v1/features/user/${userId}`); // Using a proxy
return response.json();
}
export default component$(() => {
const initialFeatures = useUserFeatures();
// useResource$ is for handling async operations that can be re-triggered
const featuresResource = useResource$<UserFeatures>(({ track, cleanup }) => {
// track tells Qwik to re-run this function when this signal changes.
// Here we use initialFeatures, so it runs once initially.
track(() => initialFeatures.value);
const controller = new AbortController();
cleanup(() => controller.abort());
// For the initial load, we use the value from the server loader.
// For subsequent client-side triggers, we would re-fetch.
// This example focuses on the initial load's speed.
// To make it dynamic, we'd need a signal to 'track' that changes on button click.
return initialFeatures.value;
});
return (
<div>
<h1>Welcome, user-123!</h1>
<section>
<h2>Based on your recent activity, you might like:</h2>
<Resource
value={featuresResource}
onPending={() => <div class="recommendation-card">Loading...</div>}
onRejected={(error) => <div class="recommendation-card">Error: {error.message}</div>}
onResolved={(features) => (
<div class="recommendation-card">
<p>Product ID: {features.lastViewedProduct}</p>
</div>
)}
/>
<button onClick$={() => {
// In a real scenario, this would trigger a re-fetch.
// For example by updating a signal tracked by useResource$.
console.log("Button clicked. A re-fetch mechanism would be implemented here.");
}}>Refresh Recommendation</button>
</section>
</div>
);
});
onClick$
中的 $
符号是 Qwik 的魔法所在。Qwik 的优化编译器(Optimizer)会分析这段代码,将其提取为一个可懒加载的微小 JavaScript chunk。只有当用户 第一次 点击这个按钮时,这个 chunk 才会被下载和执行。这就是 Qwik 的可恢复性(Resumability)的核心:事件监听器早已通过序列化信息“连接”好了,但实现它的代码只有在需要时才加载。这与 hydration 模型完全相反,后者需要在交互前预先执行大量代码。
方案的局限性与未来迭代路径
这套架构成功地将一个由 MyBatis 驱动的传统系统接入了实时数据处理和现代前端的快车道,但它并非银弹。
首先,特征工程逻辑被硬编码在 Java 服务中。对于数据科学家来说,这不够灵活。一个更成熟的方案会将特征计算逻辑下沉到像 Apache Flink 这样的流处理引擎中,允许使用 SQL 或 Python 进行更复杂的有状态计算和窗口操作。
其次,特征存储相对简单。对于需要存储大量历史特征或支持“时间旅行”查询的复杂场景,一个专业的特征存储系统如 Feast 或 Tecton 会是更好的选择。我们的 Caffeine + Redis 组合更适合对延迟极度敏感、特征维度较少的场景。
最后,API 通信目前是基于 HTTP 请求-响应模式。虽然 Qwik 的 routeLoader$
优化了首次加载,但如果需要服务器主动向客户端推送更新(例如,当用户正在浏览页面时,他们的特征发生了变化),应考虑使用 WebSockets 或 Server-Sent Events (SSE) 建立持久连接,将延迟进一步压缩。
这个方案的真正价值在于它提供了一个务实且可落地的演进路径,它没有要求推翻整个遗留系统,而是通过 CDC 和现代前端技术,像外科手术一样精确地为老系统注入了实时 AI 的能力。