构建从 MyBatis 到 Qwik 的实时 AI 特征管道以实现即时个性化


我们面临一个典型的存量系统现代化改造难题。核心业务数据稳固地运行在 PostgreSQL 之上,数据访问层由 MyBatis 全权接管,这套组合在多年的事务处理中表现得稳定可靠。但新的需求来了:业务方希望在用户访问前端页面时,能根据用户的实时行为(例如,最近30秒内的点击、浏览)动态推荐内容。传统的批处理 ETL 每天更新一次用户画像,其延迟是小时乃至天级别的,完全无法满足“实时”这个核心要求。前端即便使用了服务端渲染(SSR),在呈现动态内容时, hydration 带来的交互延迟也让个性化体验大打折扣。

目标很明确:构建一个从数据产生到前端呈现,端到端延迟在百毫秒级别的实时特征管道。这意味着需要一条从后端数据库变更,到中间层特征计算,再到前端无缝渲染的完整、低延迟链路。这是一个整合了数据工程、后端服务和前沿前端技术的综合性挑战。

架构构想与技术选型决策

直接轮询数据库是首先被否决的方案,它会对主库造成巨大压力且无法做到真正的实时。我们的起点必须是数据库的事务日志。

  1. 数据源端: 现有系统使用 MyBatis 操作 PostgreSQL。这是我们必须接受的既定事实,也是改造的起点。我们将利用 PostgreSQL 的逻辑解码(Logical Decoding)功能,这是实现变更数据捕获(CDC)的基础。

  2. 变更数据捕获 (CDC): Debezium 是这个领域的标准答案。通过部署一个 Debezium 的 PostgreSQL 连接器到 Kafka Connect 集群,我们可以将数据库 user_activity 表的 INSERT, UPDATE, DELETE 操作以结构化事件的形式实时推送到 Kafka topic 中。这是一种对源数据库非侵入式的方案。

  3. 实时特征计算层: 消费 Kafka 事件并计算特征的服务是整个管道的核心。考虑到团队对 JVM 技术栈的熟悉度,我们选择使用 Java 和 Spring Boot 构建此服务。特征的计算逻辑可能很简单(如记录用户最后点击的商品类目),也可能很复杂(如计算用户近期行为的向量嵌入)。计算出的特征需要一个低延迟的存储。

  4. 特征存储: 特征数据要求极高的读取性能。我们采用两级缓存策略:

    • L1 缓存: 使用 Caffeine 作为进程内缓存,存储最热用户的特征。这能提供微秒级的访问速度。
    • L2 缓存: 使用 Redis 作为分布式缓存,为 L1 缓存提供穿透保护,并保证特征计算服务的多个实例之间数据的一致性。
  5. 特征服务API: 一个简单的、基于 Spring WebFlux 的非阻塞 RESTful API,用于向前端提供用户特征。例如,GET /api/v1/features/user/{userId}。非阻塞 I/O 对维持低延迟至关重要。

  6. 前端框架: 这是消除最后一道延迟鸿沟的关键。传统的 React/Vue/Angular 应用,即使经过 SSR,也需要在客户端执行 hydration 过程,这个过程会阻塞主线程,延迟用户可交互时间(TTI)。对于需要即时响应的个性化内容,这种延迟是不可接受的。Qwik 的 Resumability(可恢复性)从根本上解决了这个问题。它在服务端生成 HTML 的同时,将应用状态和事件监听器信息序列化并内联到 HTML 中。客户端无需重新执行任何 JavaScript 代码即可响应用户交互,实现了真正的零 hydration 和即时交互。

整个架构的数据流图如下:

graph TD
    subgraph "Legacy System"
        A[User Action] --> B{Application Logic};
        B --> C[MyBatis Mapper];
        C --> D[(PostgreSQL)];
    end

    subgraph "Real-time Feature Pipeline"
        D -- WAL --> E[Debezium Connector];
        E -- CDC Events --> F[(Kafka Topic)];
        F --> G[Feature Engineering Service];
        G -- Computes & Stores --> H{Cache};
        H -- L1 --> I[Caffeine];
        H -- L2 --> J[Redis];
    end

    subgraph "Serving & Frontend"
        K[Qwik Frontend] -- HTTP Request --> L[Feature Serving API];
        L -- Reads from --> H;
        L -- Feature Response --> K;
    end

    style D fill:#d3e5ef,stroke:#333,stroke-width:2px
    style F fill:#fce8b2,stroke:#333,stroke-width:2px
    style J fill:#ffcdd2,stroke:#333,stroke-width:2px

实现:从数据库到 Kafka 的实时数据流

1. 数据库与 MyBatis 配置

首先,确保 PostgreSQL 开启了逻辑复制。在 postgresql.conf 中进行配置:

# postgresql.conf
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10

假设我们有一个记录用户活动的数据表:

CREATE TABLE user_activity (
    id UUID PRIMARY KEY,
    user_id VARCHAR(255) NOT NULL,
    activity_type VARCHAR(50) NOT NULL,
    product_id VARCHAR(255),
    activity_timestamp TIMESTAMPTZ NOT NULL
);

-- 为逻辑复制创建发布
CREATE PUBLICATION dbz_publication FOR TABLE user_activity;

在遗留系统中,通过 MyBatis 插入数据是常规操作。

<!-- UserActivityMapper.xml -->
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
  PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.legacy.mapper.UserActivityMapper">
  <insert id="insertActivity" parameterType="com.example.legacy.model.UserActivity">
    INSERT INTO user_activity (id, user_id, activity_type, product_id, activity_timestamp)
    VALUES (#{id}, #{userId}, #{activityType}, #{productId}, #{activityTimestamp})
  </insert>
</mapper>

2. 配置 Debezium 连接器

我们使用 Kafka Connect 来运行 Debezium。以下是向 Kafka Connect REST API 提交的连接器配置 pg-connector.json。在真实项目中,这个配置应该通过 IaC 工具(如 Terraform)进行管理。

{
    "name": "user-activity-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "postgres-host",
        "database.port": "5432",
        "database.user": "debezium_user",
        "database.password": "debezium_password",
        "database.dbname": "app_db",
        "database.server.name": "app_server_1",
        "table.include.list": "public.user_activity",
        "publication.name": "dbz_publication",
        "publication.autocreate.mode": "disabled",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "topic.prefix": "cdc",
        "slot.name": "debezium_user_activity_slot",
        "slot.drop.on.stop": "false"
    }
}

关键配置解析:

  • plugin.name: pgoutput 是 PostgreSQL 10+ 内置的逻辑解码插件,性能优异。
  • table.include.list: 明确指定我们只关心 user_activity 这张表的数据变更。
  • publication.name: 必须与数据库中创建的 PUBLICATION 名称一致。
  • value.converter.schemas.enable: 设置为 false 可以得到纯净的 JSON 数据,而不是带有 schema 信息的复杂结构,简化下游消费。
  • slot.drop.on.stop: 设置为 false至关重要。这意味着即使连接器停止,复制槽也会保留,数据库会为该槽保留 WAL 日志,确保连接器重启后不会丢失任何数据。

当 MyBatis 插入一条新记录时,几毫秒内,我们就能在 Kafka 的 cdc.app_server_1.public.user_activity topic 中看到一条类似这样的 JSON 消息:

{
  "payload": {
    "before": null,
    "after": {
      "id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
      "user_id": "user-123",
      "activity_type": "VIEW_PRODUCT",
      "product_id": "prod-456",
      "activity_timestamp": 1678886400000
    },
    "source": { ... },
    "op": "c", // c for create/insert
    "ts_ms": 1678886400123
  }
}

实现:特征计算服务与低延迟存储

这个 Spring Boot 服务是管道的中枢。

1. 项目依赖 (pom.xml)

<dependencies>
    <!-- Spring Boot WebFlux for non-blocking API -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <!-- Kafka Consumer -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Redis with Lettuce (reactive driver) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
    <!-- Caffeine Cache -->
    <dependency>
        <groupId>com.github.ben-manes.caffeine</groupId>
        <artifactId>caffeine</artifactId>
    </dependency>
    <!-- Jackson for JSON processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- Logging -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
    </dependency>
</dependencies>

2. Kafka 消费者与特征处理

我们定义一个服务来监听 Kafka topic 并更新特征缓存。

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class UserActivityConsumer {

    private static final Logger log = LoggerFactory.getLogger(UserActivityConsumer.class);
    private final ObjectMapper objectMapper;
    private final UserFeatureService userFeatureService;

    public UserActivityConsumer(ObjectMapper objectMapper, UserFeatureService userFeatureService) {
        this.objectMapper = objectMapper;
        this.userFeatureService = userFeatureService;
    }

    @KafkaListener(topics = "cdc.app_server_1.public.user_activity", groupId = "feature-engineering-group")
    public void consume(String message) {
        try {
            JsonNode root = objectMapper.readTree(message);
            // Debezium event can be null for deletions or heartbeat messages
            JsonNode payload = root.get("payload");
            if (payload == null || payload.isNull()) {
                log.info("Received message with null payload, possibly a tombstone or heartbeat. Skipping.");
                return;
            }
            
            // We only care about create ('c') and update ('u') operations
            String op = payload.get("op").asText();
            if (!"c".equals(op) && !"u".equals(op)) {
                return;
            }
            
            JsonNode after = payload.get("after");
            if (after == null || after.isNull()) {
                return;
            }

            String userId = after.get("user_id").asText();
            String activityType = after.get("activity_type").asText();
            String productId = after.has("product_id") ? after.get("product_id").asText(null) : null;

            // This is where feature engineering happens.
            // For this example, we'll just update the user's last seen product.
            if ("VIEW_PRODUCT".equals(activityType) && productId != null) {
                userFeatureService.updateLastViewedProduct(userId, productId).subscribe();
            }

        } catch (IOException e) {
            log.error("Failed to parse Debezium CDC event: {}", message, e);
            // In a production system, push this to a dead-letter queue (DLQ)
        }
    }
}

错误处理: try-catch 块是生产级代码的最低要求。任何解析失败的消息都应被记录并发送到DLQ,以防消费者循环崩溃。同时,需要处理 Debezium 的心跳消息或删除操作产生的 tombstone 消息,它们的 payload 可能为 null

3. 特征服务与缓存层

UserFeatureService 负责与 Caffeine 和 Redis 交互。

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.time.Duration;

@Service
public class UserFeatureService {

    private final ReactiveStringRedisTemplate redisTemplate;
    // L1 Cache: In-memory cache for ultra-fast access
    private final AsyncLoadingCache<String, String> lastViewedProductCache;

    public UserFeatureService(ReactiveStringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        
        // Build the Caffeine cache
        this.lastViewedProductCache = Caffeine.newBuilder()
                .maximumSize(100_000) // Max 100k users in memory
                .expireAfterWrite(Duration.ofMinutes(5)) // Evict after 5 minutes of no writes
                .buildAsync(this::loadFromRedis); // Cache loader for cache misses
    }

    // This method is called by the Kafka consumer
    public Mono<Void> updateLastViewedProduct(String userId, String productId) {
        // Update both L1 and L2 caches
        lastViewedProductCache.put(userId, Mono.just(productId).toFuture());
        String redisKey = "user_feature:last_viewed:" + userId;
        return redisTemplate.opsForValue()
                .set(redisKey, productId, Duration.ofHours(1)) // Set Redis TTL
                .then();
    }

    public Mono<String> getLastViewedProduct(String userId) {
        // This will first check Caffeine, if not found, it will call loadFromRedis
        return Mono.fromFuture(lastViewedProductCache.get(userId));
    }

    // Cache loader function, called by Caffeine on a cache miss
    private Mono<String> loadFromRedis(String userId) {
        String redisKey = "user_feature:last_viewed:" + userId;
        // fallbackTo() is used to return a default/empty value if not in Redis either
        return redisTemplate.opsForValue().get(redisKey).defaultIfEmpty("N/A");
    }
}

设计考量:

  • AsyncLoadingCache 是 Caffeine 的异步版本,与 WebFlux 的响应式编程模型完美契合。
  • getLastViewedProduct 被调用时,如果 L1 缓存命中,则立即返回。如果未命中,Caffeine 会自动调用 loadFromRedis 方法从 L2 缓存加载数据,填充 L1 后再返回。
  • 更新操作 updateLastViewedProduct 会同时写入 L1 和 L2,以保证数据一致性。这是一个简化的 write-through 策略。

4. 特征服务 API

最后,一个简单的 WebFlux Controller 暴露这些特征。

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.Map;

@RestController
@RequestMapping("/api/v1/features")
public class FeatureController {

    private final UserFeatureService featureService;

    public FeatureController(UserFeatureService featureService) {
        this.featureService = featureService;
    }

    @GetMapping("/user/{userId}")
    public Mono<Map<String, String>> getUserFeatures(@PathVariable String userId) {
        // In a real app, you'd fetch multiple features concurrently
        return featureService.getLastViewedProduct(userId)
                .map(lastViewed -> Map.of("lastViewedProduct", lastViewed));
    }
}

实现:Qwik 前端即时消费

现在来看前端如何消费这些实时特征并做到即时交互。

1. Qwik 项目设置与数据获取

在 Qwik 中,数据获取的首选方式是使用 routeLoader$。它在服务端执行,并将数据序列化传递给客户端,供组件使用。

// src/routes/index.tsx
import { component$ } from '@builder.io/qwik';
import { routeLoader$ } from '@builder.io/qwik-city';

// Define the type for our feature data
export interface UserFeatures {
  lastViewedProduct: string;
}

// routeLoader$ runs on the server during SSR
export const useUserFeatures = routeLoader$<UserFeatures>(async (requestEv) => {
  // In a real app, userId would come from a cookie or session
  const userId = 'user-123'; 
  const featureApiUrl = `http://feature-service:8080/api/v1/features/user/${userId}`;

  try {
    const response = await fetch(featureApiUrl);
    if (!response.ok) {
      // Proper error handling is crucial.
      // Log the error on the server and return a default state.
      console.error(`Failed to fetch features: ${response.statusText}`);
      return { lastViewedProduct: 'default-recommendation' };
    }
    const data = await response.json();
    return data as UserFeatures;
  } catch (error) {
    console.error(`Network error fetching features:`, error);
    return { lastViewedProduct: 'default-recommendation' };
  }
});

export default component$(() => {
  const features = useUserFeatures();

  return (
    <div>
      <h1>Welcome, user-123!</h1>
      <section>
        <h2>Based on your recent activity, you might like:</h2>
        <div class="recommendation-card">
          {/* 
            The value is available immediately on render.
            No client-side fetch, no waiting, no hydration.
          */}
          <p>Product ID: {features.value.lastViewedProduct}</p>
        </div>
      </section>
    </div>
  );
});

关键优势:

  • fetch 调用在服务端完成。客户端收到的 HTML 已经包含了 lastViewedProduct 的值。
  • 浏览器渲染此页面时,recommendation-card 会立刻显示出个性化内容。用户看到页面的第一帧就是最终状态。没有内容跳变,也没有因数据加载而出现的骨架屏。

2. 客户端的交互式更新

如果用户在页面上执行了某个操作,需要刷新推荐内容,我们可以使用 useResource$

// src/routes/index.tsx (continued)
import { component$, useResource$, Resource } from '@builder.io/qwik';
// ... (imports and routeLoader$ from before) ...

// A function to re-fetch features from the client-side
async function fetchFeaturesClientSide(userId: string): Promise<UserFeatures> {
    const response = await fetch(`/api/v1/features/user/${userId}`); // Using a proxy
    return response.json();
}


export default component$(() => {
  const initialFeatures = useUserFeatures();
  
  // useResource$ is for handling async operations that can be re-triggered
  const featuresResource = useResource$<UserFeatures>(({ track, cleanup }) => {
    // track tells Qwik to re-run this function when this signal changes.
    // Here we use initialFeatures, so it runs once initially.
    track(() => initialFeatures.value);
    
    const controller = new AbortController();
    cleanup(() => controller.abort());
    
    // For the initial load, we use the value from the server loader.
    // For subsequent client-side triggers, we would re-fetch.
    // This example focuses on the initial load's speed.
    // To make it dynamic, we'd need a signal to 'track' that changes on button click.
    return initialFeatures.value;
  });

  return (
    <div>
      <h1>Welcome, user-123!</h1>
      <section>
        <h2>Based on your recent activity, you might like:</h2>
        <Resource
          value={featuresResource}
          onPending={() => <div class="recommendation-card">Loading...</div>}
          onRejected={(error) => <div class="recommendation-card">Error: {error.message}</div>}
          onResolved={(features) => (
            <div class="recommendation-card">
              <p>Product ID: {features.lastViewedProduct}</p>
            </div>
          )}
        />
        <button onClick$={() => {
            // In a real scenario, this would trigger a re-fetch.
            // For example by updating a signal tracked by useResource$.
            console.log("Button clicked. A re-fetch mechanism would be implemented here.");
        }}>Refresh Recommendation</button>
      </section>
    </div>
  );
});

onClick$ 中的 $ 符号是 Qwik 的魔法所在。Qwik 的优化编译器(Optimizer)会分析这段代码,将其提取为一个可懒加载的微小 JavaScript chunk。只有当用户 第一次 点击这个按钮时,这个 chunk 才会被下载和执行。这就是 Qwik 的可恢复性(Resumability)的核心:事件监听器早已通过序列化信息“连接”好了,但实现它的代码只有在需要时才加载。这与 hydration 模型完全相反,后者需要在交互前预先执行大量代码。

方案的局限性与未来迭代路径

这套架构成功地将一个由 MyBatis 驱动的传统系统接入了实时数据处理和现代前端的快车道,但它并非银弹。

首先,特征工程逻辑被硬编码在 Java 服务中。对于数据科学家来说,这不够灵活。一个更成熟的方案会将特征计算逻辑下沉到像 Apache Flink 这样的流处理引擎中,允许使用 SQL 或 Python 进行更复杂的有状态计算和窗口操作。

其次,特征存储相对简单。对于需要存储大量历史特征或支持“时间旅行”查询的复杂场景,一个专业的特征存储系统如 Feast 或 Tecton 会是更好的选择。我们的 Caffeine + Redis 组合更适合对延迟极度敏感、特征维度较少的场景。

最后,API 通信目前是基于 HTTP 请求-响应模式。虽然 Qwik 的 routeLoader$ 优化了首次加载,但如果需要服务器主动向客户端推送更新(例如,当用户正在浏览页面时,他们的特征发生了变化),应考虑使用 WebSockets 或 Server-Sent Events (SSE) 建立持久连接,将延迟进一步压缩。

这个方案的真正价值在于它提供了一个务实且可落地的演进路径,它没有要求推翻整个遗留系统,而是通过 CDC 和现代前端技术,像外科手术一样精确地为老系统注入了实时 AI 的能力。


  目录