基于CDC和OpenSearch构建DDD聚合根的近实时读取模型


项目的写模型越来越复杂,一个订单聚合根(Order Aggregate)可能关联着订单行、收货地址、发票信息、优惠活动等多个实体和值对象。为了保证业务规则和事务一致性,我们用DDD和JPA(一种ORM实现)构建了强大的领域模型。这在处理命令(Commands)时表现出色,但在处理查询(Queries)时,噩梦开始了。

运营部门需要一个能从任意维度组合查询订单的后台,比如“查询最近30天,由华东仓库发货,包含‘A品牌手机’,且用户标签为‘高价值’的所有待处理订单”。这种查询在JPA和关系型数据库(我们用的是PostgreSQL)上,意味着多张表的复杂JOINLIKE以及跨服务的调用。性能急剧下降,数据库CPU告警频繁,甚至拖慢了核心的下单流程。

这个技术痛点非常典型:为事务一致性设计的写模型,与为复杂查询设计的读模型,存在天然的矛盾。强行用一套模型满足两种需求,最终两边都做不好。

初步构想:走向CQRS

将命令和查询的职责分离(CQRS)是解决这个问题的标准答案。我们决定:

  1. 写模型 (Write Model): 继续使用DDD聚合根和JPA,部署在核心交易服务中,操作主数据库PostgreSQL。这部分保持不变,它的首要任务是保证数据一致性。
  2. 读模型 (Read Model): 构建一个独立的、反范式的数据视图,存储在OpenSearch中。OpenSearch强大的文本搜索和聚合分析能力,天生就是为复杂查询而生。

架构图的核心问题在于中间的箭头:如何将写模型的数据变更,可靠、低延迟、低侵入地同步到读模型?

技术选型决策:为何选择CDC

我们评估了三种常见的数据同步方案:

  1. 双写 (Dual Writes): 在应用层代码中,完成数据库操作后,再调用OpenSearch的API写入数据。这被第一时间否决了。 分布式事务的复杂性太高,无法保证两个操作的原子性。一旦写入OpenSearch失败,数据就不一致了,后续的补偿和对账逻辑会让系统变得极其脆弱。

  2. 应用层事件 (Application Events): 在完成数据库事务后,由应用发送一个消息到消息队列(如Kafka),下游的订阅者消费消息并更新OpenSearch。这比双写好,解耦了服务。但在真实项目中,这里的坑在于:

    • 可靠性问题: 如果消息发送失败怎么办?先提交数据库事务再发消息,应用崩溃可能导致消息丢失。先发消息再提交事务,事务回滚了消息却发出去了。需要引入事务性发件箱(Transactional Outbox)模式,增加了复杂度。
    • 侵入性问题: 这要求领域层代码必须感知到“我需要发布一个事件”。这污染了领域模型的纯粹性,业务逻辑和消息通知逻辑耦合在了一起。
  3. 变更数据捕获 (Change Data Capture - CDC): 通过订阅数据库的事务日志(如PostgreSQL的WAL),来捕获所有的数据变更。Debezium是这个领域的佼佼者。

    • 非侵入性: 完全不改变现有应用代码。应用只需要像往常一样和数据库交互。
    • 可靠性: 基于数据库的事务日志,这是最可靠的数据变更源。只要事务提交成功,变更就一定在日志里,保证不会丢失。
    • 低延迟: 直接读取日志,延迟非常低,可以做到近实时。

决策很明确:采用Debezium CDC方案。它将数据同步的责任从应用层下沉到基础设施层,更健壮,也更优雅。

步骤化实现:构建从PostgreSQL到OpenSearch的数据管道

我们的目标是搭建一个完整的数据流:PostgreSQL -> Debezium -> Kafka -> Projection Service -> OpenSearch

1. 定义写模型:DDD聚合根与JPA实体

首先,这是我们简化的Order聚合根。它使用JPA注解,由ORM负责持久化。注意,它的设计完全是为了封装业务规则和保证一致性,查询性能不是它的主要考量。

// File: com/mycorp/order/domain/Order.java

import javax.persistence.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

@Entity
@Table(name = "orders")
public class Order {

    @Id
    @Column(name = "id", nullable = false)
    private UUID id;

    @Column(name = "customer_id", nullable = false)
    private Long customerId;

    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false)
    private OrderStatus status;

    @OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.EAGER)
    private Set<OrderLine> orderLines = new HashSet<>();

    @Embedded
    private ShippingAddress shippingAddress;

    @Column(name = "total_price", nullable = false)
    private BigDecimal totalPrice;

    @Column(name = "created_at", nullable = false)
    private Instant createdAt;

    @Version
    private long version;

    // 构造函数、业务方法 (如 addOrderLine, ship, cancel) 省略
    // Getters and Setters 省略

    public void calculateTotalPrice() {
        this.totalPrice = orderLines.stream()
            .map(OrderLine::getSubtotal)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    }
}

@Entity
@Table(name = "order_lines")
public class OrderLine {
    @Id
    @Column(name = "id", nullable = false)
    private UUID id;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "order_id", nullable = false)
    private Order order;

    @Column(name = "product_id", nullable = false)
    private String productId;
    
    @Column(name = "product_name", nullable = false)
    private String productName;

    @Column(name = "quantity", nullable = false)
    private int quantity;

    @Column(name = "unit_price", nullable = false)
    private BigDecimal unitPrice;
    
    // Getters and Setters and other methods...
    
    public BigDecimal getSubtotal() {
        return unitPrice.multiply(new BigDecimal(quantity));
    }
}

@Embeddable
public class ShippingAddress {
    @Column(name = "shipping_province")
    private String province;
    @Column(name = "shipping_city")
    private String city;
    @Column(name = "shipping_detail")
    private String detail;
}

public enum OrderStatus {
    PENDING, PAID, SHIPPED, DELIVERED, CANCELLED
}

2. 搭建基础设施:Docker Compose

在真实项目中,这些组件会由运维团队管理。但在开发阶段,一个docker-compose.yml就足够了。

# file: docker-compose.yml
version: '3.8'

services:
  postgres:
    image: debezium/postgres:14
    container_name: postgres-db
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=orderdb
    volumes:
      - ./pg_data:/var/lib/postgresql/data
    command: >
      postgres 
      -c wal_level=logical

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  kafka-connect:
    image: debezium/connect:2.1
    container_name: kafka-connect
    ports:
      - "8083:8083"
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: 'kafka:29092'
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses

  opensearch:
    image: opensearchproject/opensearch:2.5.0
    container_name: opensearch
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - "9200:9200"
      - "9600:9600" # required for Performance Analyzer

一个关键配置是PostgreSQL的wal_level=logical,这让Debezium可以解码WAL日志。

3. 配置Debezium PostgreSQL Connector

启动容器后,我们需要通过Kafka Connect的REST API来配置并启动Debezium连接器。

# shell command to register the connector
curl -X POST -H "Content-Type: application/json" --data @debezium-pg-config.json http://localhost:8083/connectors

debezium-pg-config.json文件内容是配置的核心:

// file: debezium-pg-config.json
{
  "name": "order-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres-db",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname": "orderdb",
    "database.server.name": "pgserver1",
    "table.include.list": "public.orders,public.order_lines",
    "plugin.name": "pgoutput",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string"
  }
}

这里的关键配置:

  • table.include.list: 指定要监控的表。我们关心ordersorder_lines
  • database.server.name: 会成为Kafka Topic的前缀,比如pgserver1.public.orders
  • value.converter.schemas.enable: 设置为false以获得更简洁的JSON消息,否则Debezium会带上完整的schema信息,消息体会非常臃e肿。
  • decimal.handling.mode: string模式可以避免BigDecimal的精度问题。

4. 投影服务 (Projection Service):从事件到文档

这是整个管道的大脑。我们用Spring for Apache Kafka构建一个独立的微服务,它消费Debezium产生的CDC事件,并将其“投影”成OpenSearch的读模型文档。

第一步:定义OpenSearch的读模型文档

这个文档是反范式的,包含了所有查询需要的信息。

// file: com/mycorp/projection/OrderDocument.java
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
// ... more imports

@Document(indexName = "orders_read_model")
public class OrderDocument {

    @Id
    private String id; // UUID as String

    @Field(type = FieldType.Long)
    private Long customerId;

    @Field(type = FieldType.Keyword)
    private String status;

    @Field(type = FieldType.Nested) // 关键:使用Nested类型处理对象数组
    private List<OrderLineDoc> orderLines;

    @Field(type = FieldType.Object)
    private ShippingAddressDoc shippingAddress;
    
    // 冗余字段,用于搜索和聚合
    @Field(type = FieldType.Text, analyzer = "standard")
    private String productNameSearch; // 拼接所有商品名,用于全文搜索

    @Field(type = FieldType.Keyword)
    private List<String> productIds;

    @Field(type = FieldType.Double)
    private BigDecimal totalPrice;

    @Field(type = FieldType.Date, format = DateFormat.date_optional_time)
    private Instant createdAt;

    // ... Getters and Setters
}

// 内部文档结构
public class OrderLineDoc {
    private String productId;
    private String productName;
    private int quantity;
    private BigDecimal unitPrice;
    // ... Getters and Setters
}

public class ShippingAddressDoc {
    private String province;
    private String city;
    // ... Getters and Setters
}

第二步:消费并处理CDC事件

这是最复杂的部分。Debezium的事件格式有固定的结构,我们需要解析它。
一个典型的orders表更新事件如下:

{
  "before": { /* a full row data before update */ },
  "after": { /* a full row data after update */ },
  "source": { /* metadata */ },
  "op": "u", // 'c' for create, 'u' for update, 'd' for delete
  "ts_ms": 1672531200000
}

我们的Kafka Listener需要处理这些事件:

// file: com/mycorp/projection/CdcEventConsumer.java

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class CdcEventConsumer {

    private static final Logger log = LoggerFactory.getLogger(CdcEventConsumer.class);
    private final ObjectMapper objectMapper;
    private final OrderProjectionHandler projectionHandler;

    public CdcEventConsumer(ObjectMapper objectMapper, OrderProjectionHandler projectionHandler) {
        this.objectMapper = objectMapper;
        this.projectionHandler = projectionHandler;
    }

    @KafkaListener(topics = {"pgserver1.public.orders", "pgserver1.public.order_lines"})
    public void handleCdcEvent(String payload) {
        try {
            JsonNode event = objectMapper.readTree(payload);
            JsonNode payloadNode = event.get("payload");
            
            // 如果是删除事件,payload为null
            if (payloadNode == null || payloadNode.isNull()) {
                 if (event.has("payload") && event.get("payload").isNull()){
                    // Debezium tombstone message, can be ignored or used for compaction
                    log.info("Received tombstone message. Ignoring.");
                 } else {
                    log.warn("Payload is null, cannot process event: {}", payload);
                 }
                return;
            }

            String topic = payloadNode.get("source").get("topic").asText();
            char op = payloadNode.get("op").asText().charAt(0);
            
            JsonNode data = (op == 'd') ? payloadNode.get("before") : payloadNode.get("after");
            if (data == null) {
                log.error("Data node is null for op '{}' in event: {}", op, payload);
                return;
            }

            if (topic.endsWith("orders")) {
                projectionHandler.handleOrderEvent(op, data);
            } else if (topic.endsWith("order_lines")) {
                projectionHandler.handleOrderLineEvent(op, data);
            }

        } catch (IOException e) {
            // 在真实项目中,这里需要有重试和死信队列(DLQ)机制
            log.error("Failed to process CDC event: {}", payload, e);
        }
    }
}

第三步:实现核心投影逻辑 (ProjectionHandler)

这里的坑在于,对order_lines表的任何变更(增、删、改),都必须触发整个OrderDocument的重新构建和索引。因为OrderDocument是聚合的根。

// file: com/mycorp/projection/OrderProjectionHandler.java
import org.springframework.stereotype.Component;

@Component
public class OrderProjectionHandler {
    
    private final OrderDocumentRepository orderDocumentRepository; // Spring Data Elasticsearch repository
    private final OrderReadModelBuilder readModelBuilder; // 负责构建完整的OrderDocument

    // ... constructor

    public void handleOrderEvent(char op, JsonNode orderData) {
        String orderId = orderData.get("id").asText();
        
        if (op == 'd') {
            orderDocumentRepository.deleteById(orderId);
            log.info("Deleted order document: {}", orderId);
            return;
        }

        // For 'c' or 'u' on orders table, we rebuild the whole document
        OrderDocument doc = readModelBuilder.buildFromOrderId(UUID.fromString(orderId));
        if (doc != null) {
            orderDocumentRepository.save(doc);
            log.info("Indexed order document: {}", orderId);
        } else {
            // 可能由于数据延迟,关联数据还未就绪,需要处理这种情况
            log.warn("Could not build document for order {}, maybe related data is not available yet.", orderId);
        }
    }

    public void handleOrderLineEvent(char op, JsonNode orderLineData) {
        // 这里的关键:无论order_line发生什么变化,我们都以order为单位进行重建
        String orderId = orderLineData.get("order_id").asText();
        
        // Even for 'd', we just re-index the parent order.
        // The builder will fetch current state and build the document.
        OrderDocument doc = readModelBuilder.buildFromOrderId(UUID.fromString(orderId));
        if (doc != null) {
            orderDocumentRepository.save(doc);
            log.info("Re-indexed order document {} due to order_line change.", orderId);
        } else {
            // It's possible the parent order was deleted.
            // Check if the document exists and delete if necessary.
            if (!orderDocumentRepository.existsById(orderId)) {
                 log.info("Parent order {} for order_line change does not exist. No action needed.", orderId);
            } else {
                 orderDocumentRepository.deleteById(orderId);
                 log.warn("Parent order {} document was found but could not be rebuilt. Deleting for safety.", orderId);
            }
        }
    }
}

OrderReadModelBuilder会直接查询主数据库(PostgreSQL)来获取一个Order聚合根的完整、最新状态,然后将其转换为OrderDocument。这是一个常见的模式,确保了读模型的最终一致性。虽然这会给主库带来一些额外的读压力,但查询是基于主键的,性能很高,并且完全避免了在投影服务中维护复杂状态的麻烦。

// file: com/mycorp/projection/OrderReadModelBuilder.java
@Service
public class OrderReadModelBuilder {

    // Inject JPA Repository to read from the master DB
    private final OrderRepository orderRepository; 

    public OrderDocument buildFromOrderId(UUID orderId) {
        // Use an Optional to handle case where order might have been deleted
        Optional<Order> orderOpt = orderRepository.findById(orderId);
        
        if (orderOpt.isEmpty()) {
            return null;
        }
        
        Order order = orderOpt.get();
        return mapToDocument(order);
    }
    
    // Mapper logic from JPA entity to OpenSearch document
    private OrderDocument mapToDocument(Order order) {
        OrderDocument doc = new OrderDocument();
        doc.setId(order.getId().toString());
        doc.setCustomerId(order.getCustomerId());
        // ... mapping all other fields
        
        List<OrderLineDoc> lineDocs = order.getOrderLines().stream()
                .map(this::mapOrderLine)
                .collect(Collectors.toList());
        doc.setOrderLines(lineDocs);

        // Build denormalized search fields
        doc.setProductIds(order.getOrderLines().stream().map(OrderLine::getProductId).collect(Collectors.toList()));
        doc.setProductNameSearch(order.getOrderLines().stream().map(OrderLine::getProductName).collect(Collectors.joining(" ")));
        
        return doc;
    }

    private OrderLineDoc mapOrderLine(OrderLine line) {
        // ... mapping logic
    }
}

5. OpenSearch索引配置

为了让productNameSearchorderLines等字段能按预期工作,我们需要一个自定义的索引映射。

// opensearch_order_mapping.json
PUT /orders_read_model
{
  "mappings": {
    "properties": {
      "id": { "type": "keyword" },
      "customerId": { "type": "long" },
      "status": { "type": "keyword" },
      "totalPrice": { "type": "double" },
      "createdAt": { "type": "date" },
      "shippingAddress": {
        "type": "object",
        "properties": {
          "province": { "type": "keyword" },
          "city": { "type": "keyword" }
        }
      },
      "productIds": { "type": "keyword" },
      "productNameSearch": { "type": "text", "analyzer": "standard" },
      "orderLines": {
        "type": "nested",
        "properties": {
          "productId": { "type": "keyword" },
          "productName": { "type": "text" },
          "quantity": { "type": "integer" },
          "unitPrice": { "type": "double" }
        }
      }
    }
  }
}

使用nested类型而不是object来处理orderLines至关重要,它能让我们对订单行内的多个字段进行独立且正确的查询。

最终成果:一个解耦且高效的读写架构

至此,我们有了一个完整的、自动化的数据管道。

graph TD
    subgraph "Write Path"
        A[Client/API] -- Command --> B(Order Service);
        B -- Uses --> C{DDD Aggregate Root};
        C -- Persisted by ORM --> D[(PostgreSQL)];
    end

    subgraph "CDC Pipeline"
        D -- WAL --> E[Debezium Connector];
        E -- CDC Events --> F[(Kafka)];
    end

    subgraph "Read Model Projection"
        G[Projection Service] -- Consumes --> F;
        G -- Rebuilds Model by Reading --> D;
        G -- Indexes Document --> H[(OpenSearch)];
    end

    subgraph "Read Path"
        I[Client/API] -- Complex Query --> J(Query Service);
        J -- Searches --> H;
    end

现在,运营部门的复杂查询请求会打到独立的查询服务,该服务直接查询OpenSearch。例如,前面提到的那个复杂查询,在OpenSearch中就变成了一个简单的bool query:

POST /orders_read_model/_search
{
  "query": {
    "bool": {
      "must": [
        { "term": { "status": "PENDING" } },
        { "term": { "shippingAddress.province": "江苏省" } }, // Example for "华东仓库"
        { "nested": {
            "path": "orderLines",
            "query": {
              "match": { "orderLines.productName": "A品牌手机" }
            }
        }}
      ],
      "filter": [
        { "range": { "createdAt": { "gte": "now-30d/d" } } }
        // Customer tag query would likely involve another lookup or denormalization
      ]
    }
  }
}

这个查询的性能与PostgreSQL上的JOIN查询相比,有数量级的提升,并且完全不会影响到核心的写操作。

局限性与未来迭代路径

这个架构并非银弹,它引入了自身的复杂性,在真实项目中还有几个问题需要考虑:

  1. 最终一致性延迟: 从数据写入PostgreSQL到在OpenSearch中可查,存在一个毫秒到秒级的延迟。对于绝大多数后台查询场景,这是完全可以接受的。但对于需要强一致性读的场景(例如,用户刚下完单就立即查看订单详情),最佳实践是直接从主库读取。

  2. Schema演进: 如果orders表的结构发生变化(比如增加一个字段),需要一套协调好的流程:先更新投影服务的代码使其能处理新字段,部署服务,然后才执行数据库的ALTER TABLE。否则,CDC事件可能会导致投影服务解析失败。

  3. 重新快照 (Re-snapshotting): 如果投影服务的逻辑有重大变更,或者数据出现不一致需要修复,我们可能需要重新同步所有历史数据。Debezium支持触发一次性的快照,但这会对生产数据库造成额外的负载。一个更优化的方式是开发离线的数据同步脚本,绕开Debezium,直接从数据库批量读取并写入OpenSearch。

  4. 投影服务的健壮性: 当前的实现中,错误处理还很简陋。生产级的投影服务必须包含完善的重试机制、死信队列(DLQ),以及对消息处理的幂等性保证,确保即使Kafka消息重复投递,OpenSearch中的数据也是正确的。


  目录