项目的写模型越来越复杂,一个订单聚合根(Order Aggregate)可能关联着订单行、收货地址、发票信息、优惠活动等多个实体和值对象。为了保证业务规则和事务一致性,我们用DDD和JPA(一种ORM实现)构建了强大的领域模型。这在处理命令(Commands)时表现出色,但在处理查询(Queries)时,噩梦开始了。
运营部门需要一个能从任意维度组合查询订单的后台,比如“查询最近30天,由华东仓库发货,包含‘A品牌手机’,且用户标签为‘高价值’的所有待处理订单”。这种查询在JPA和关系型数据库(我们用的是PostgreSQL)上,意味着多张表的复杂JOIN
、LIKE
以及跨服务的调用。性能急剧下降,数据库CPU告警频繁,甚至拖慢了核心的下单流程。
这个技术痛点非常典型:为事务一致性设计的写模型,与为复杂查询设计的读模型,存在天然的矛盾。强行用一套模型满足两种需求,最终两边都做不好。
初步构想:走向CQRS
将命令和查询的职责分离(CQRS)是解决这个问题的标准答案。我们决定:
- 写模型 (Write Model): 继续使用DDD聚合根和JPA,部署在核心交易服务中,操作主数据库PostgreSQL。这部分保持不变,它的首要任务是保证数据一致性。
- 读模型 (Read Model): 构建一个独立的、反范式的数据视图,存储在OpenSearch中。OpenSearch强大的文本搜索和聚合分析能力,天生就是为复杂查询而生。
架构图的核心问题在于中间的箭头:如何将写模型的数据变更,可靠、低延迟、低侵入地同步到读模型?
技术选型决策:为何选择CDC
我们评估了三种常见的数据同步方案:
双写 (Dual Writes): 在应用层代码中,完成数据库操作后,再调用OpenSearch的API写入数据。这被第一时间否决了。 分布式事务的复杂性太高,无法保证两个操作的原子性。一旦写入OpenSearch失败,数据就不一致了,后续的补偿和对账逻辑会让系统变得极其脆弱。
应用层事件 (Application Events): 在完成数据库事务后,由应用发送一个消息到消息队列(如Kafka),下游的订阅者消费消息并更新OpenSearch。这比双写好,解耦了服务。但在真实项目中,这里的坑在于:
- 可靠性问题: 如果消息发送失败怎么办?先提交数据库事务再发消息,应用崩溃可能导致消息丢失。先发消息再提交事务,事务回滚了消息却发出去了。需要引入事务性发件箱(Transactional Outbox)模式,增加了复杂度。
- 侵入性问题: 这要求领域层代码必须感知到“我需要发布一个事件”。这污染了领域模型的纯粹性,业务逻辑和消息通知逻辑耦合在了一起。
变更数据捕获 (Change Data Capture - CDC): 通过订阅数据库的事务日志(如PostgreSQL的WAL),来捕获所有的数据变更。Debezium是这个领域的佼佼者。
- 非侵入性: 完全不改变现有应用代码。应用只需要像往常一样和数据库交互。
- 可靠性: 基于数据库的事务日志,这是最可靠的数据变更源。只要事务提交成功,变更就一定在日志里,保证不会丢失。
- 低延迟: 直接读取日志,延迟非常低,可以做到近实时。
决策很明确:采用Debezium CDC方案。它将数据同步的责任从应用层下沉到基础设施层,更健壮,也更优雅。
步骤化实现:构建从PostgreSQL到OpenSearch的数据管道
我们的目标是搭建一个完整的数据流:PostgreSQL -> Debezium -> Kafka -> Projection Service -> OpenSearch
。
1. 定义写模型:DDD聚合根与JPA实体
首先,这是我们简化的Order
聚合根。它使用JPA注解,由ORM负责持久化。注意,它的设计完全是为了封装业务规则和保证一致性,查询性能不是它的主要考量。
// File: com/mycorp/order/domain/Order.java
import javax.persistence.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@Entity
@Table(name = "orders")
public class Order {
@Id
@Column(name = "id", nullable = false)
private UUID id;
@Column(name = "customer_id", nullable = false)
private Long customerId;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private OrderStatus status;
@OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.EAGER)
private Set<OrderLine> orderLines = new HashSet<>();
@Embedded
private ShippingAddress shippingAddress;
@Column(name = "total_price", nullable = false)
private BigDecimal totalPrice;
@Column(name = "created_at", nullable = false)
private Instant createdAt;
@Version
private long version;
// 构造函数、业务方法 (如 addOrderLine, ship, cancel) 省略
// Getters and Setters 省略
public void calculateTotalPrice() {
this.totalPrice = orderLines.stream()
.map(OrderLine::getSubtotal)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
@Entity
@Table(name = "order_lines")
public class OrderLine {
@Id
@Column(name = "id", nullable = false)
private UUID id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "order_id", nullable = false)
private Order order;
@Column(name = "product_id", nullable = false)
private String productId;
@Column(name = "product_name", nullable = false)
private String productName;
@Column(name = "quantity", nullable = false)
private int quantity;
@Column(name = "unit_price", nullable = false)
private BigDecimal unitPrice;
// Getters and Setters and other methods...
public BigDecimal getSubtotal() {
return unitPrice.multiply(new BigDecimal(quantity));
}
}
@Embeddable
public class ShippingAddress {
@Column(name = "shipping_province")
private String province;
@Column(name = "shipping_city")
private String city;
@Column(name = "shipping_detail")
private String detail;
}
public enum OrderStatus {
PENDING, PAID, SHIPPED, DELIVERED, CANCELLED
}
2. 搭建基础设施:Docker Compose
在真实项目中,这些组件会由运维团队管理。但在开发阶段,一个docker-compose.yml
就足够了。
# file: docker-compose.yml
version: '3.8'
services:
postgres:
image: debezium/postgres:14
container_name: postgres-db
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=orderdb
volumes:
- ./pg_data:/var/lib/postgresql/data
command: >
postgres
-c wal_level=logical
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-connect:
image: debezium/connect:2.1
container_name: kafka-connect
ports:
- "8083:8083"
depends_on:
- kafka
- postgres
environment:
BOOTSTRAP_SERVERS: 'kafka:29092'
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
opensearch:
image: opensearchproject/opensearch:2.5.0
container_name: opensearch
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-node1
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
- "9600:9600" # required for Performance Analyzer
一个关键配置是PostgreSQL的wal_level=logical
,这让Debezium可以解码WAL日志。
3. 配置Debezium PostgreSQL Connector
启动容器后,我们需要通过Kafka Connect的REST API来配置并启动Debezium连接器。
# shell command to register the connector
curl -X POST -H "Content-Type: application/json" --data @debezium-pg-config.json http://localhost:8083/connectors
debezium-pg-config.json
文件内容是配置的核心:
// file: debezium-pg-config.json
{
"name": "order-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres-db",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "orderdb",
"database.server.name": "pgserver1",
"table.include.list": "public.orders,public.order_lines",
"plugin.name": "pgoutput",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"snapshot.mode": "initial",
"decimal.handling.mode": "string"
}
}
这里的关键配置:
-
table.include.list
: 指定要监控的表。我们关心orders
和order_lines
。 -
database.server.name
: 会成为Kafka Topic的前缀,比如pgserver1.public.orders
。 -
value.converter.schemas.enable
: 设置为false
以获得更简洁的JSON消息,否则Debezium会带上完整的schema信息,消息体会非常臃e肿。 -
decimal.handling.mode
:string
模式可以避免BigDecimal
的精度问题。
4. 投影服务 (Projection Service):从事件到文档
这是整个管道的大脑。我们用Spring for Apache Kafka构建一个独立的微服务,它消费Debezium产生的CDC事件,并将其“投影”成OpenSearch的读模型文档。
第一步:定义OpenSearch的读模型文档
这个文档是反范式的,包含了所有查询需要的信息。
// file: com/mycorp/projection/OrderDocument.java
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
// ... more imports
@Document(indexName = "orders_read_model")
public class OrderDocument {
@Id
private String id; // UUID as String
@Field(type = FieldType.Long)
private Long customerId;
@Field(type = FieldType.Keyword)
private String status;
@Field(type = FieldType.Nested) // 关键:使用Nested类型处理对象数组
private List<OrderLineDoc> orderLines;
@Field(type = FieldType.Object)
private ShippingAddressDoc shippingAddress;
// 冗余字段,用于搜索和聚合
@Field(type = FieldType.Text, analyzer = "standard")
private String productNameSearch; // 拼接所有商品名,用于全文搜索
@Field(type = FieldType.Keyword)
private List<String> productIds;
@Field(type = FieldType.Double)
private BigDecimal totalPrice;
@Field(type = FieldType.Date, format = DateFormat.date_optional_time)
private Instant createdAt;
// ... Getters and Setters
}
// 内部文档结构
public class OrderLineDoc {
private String productId;
private String productName;
private int quantity;
private BigDecimal unitPrice;
// ... Getters and Setters
}
public class ShippingAddressDoc {
private String province;
private String city;
// ... Getters and Setters
}
第二步:消费并处理CDC事件
这是最复杂的部分。Debezium的事件格式有固定的结构,我们需要解析它。
一个典型的orders
表更新事件如下:
{
"before": { /* a full row data before update */ },
"after": { /* a full row data after update */ },
"source": { /* metadata */ },
"op": "u", // 'c' for create, 'u' for update, 'd' for delete
"ts_ms": 1672531200000
}
我们的Kafka Listener需要处理这些事件:
// file: com/mycorp/projection/CdcEventConsumer.java
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class CdcEventConsumer {
private static final Logger log = LoggerFactory.getLogger(CdcEventConsumer.class);
private final ObjectMapper objectMapper;
private final OrderProjectionHandler projectionHandler;
public CdcEventConsumer(ObjectMapper objectMapper, OrderProjectionHandler projectionHandler) {
this.objectMapper = objectMapper;
this.projectionHandler = projectionHandler;
}
@KafkaListener(topics = {"pgserver1.public.orders", "pgserver1.public.order_lines"})
public void handleCdcEvent(String payload) {
try {
JsonNode event = objectMapper.readTree(payload);
JsonNode payloadNode = event.get("payload");
// 如果是删除事件,payload为null
if (payloadNode == null || payloadNode.isNull()) {
if (event.has("payload") && event.get("payload").isNull()){
// Debezium tombstone message, can be ignored or used for compaction
log.info("Received tombstone message. Ignoring.");
} else {
log.warn("Payload is null, cannot process event: {}", payload);
}
return;
}
String topic = payloadNode.get("source").get("topic").asText();
char op = payloadNode.get("op").asText().charAt(0);
JsonNode data = (op == 'd') ? payloadNode.get("before") : payloadNode.get("after");
if (data == null) {
log.error("Data node is null for op '{}' in event: {}", op, payload);
return;
}
if (topic.endsWith("orders")) {
projectionHandler.handleOrderEvent(op, data);
} else if (topic.endsWith("order_lines")) {
projectionHandler.handleOrderLineEvent(op, data);
}
} catch (IOException e) {
// 在真实项目中,这里需要有重试和死信队列(DLQ)机制
log.error("Failed to process CDC event: {}", payload, e);
}
}
}
第三步:实现核心投影逻辑 (ProjectionHandler)
这里的坑在于,对order_lines
表的任何变更(增、删、改),都必须触发整个OrderDocument
的重新构建和索引。因为OrderDocument
是聚合的根。
// file: com/mycorp/projection/OrderProjectionHandler.java
import org.springframework.stereotype.Component;
@Component
public class OrderProjectionHandler {
private final OrderDocumentRepository orderDocumentRepository; // Spring Data Elasticsearch repository
private final OrderReadModelBuilder readModelBuilder; // 负责构建完整的OrderDocument
// ... constructor
public void handleOrderEvent(char op, JsonNode orderData) {
String orderId = orderData.get("id").asText();
if (op == 'd') {
orderDocumentRepository.deleteById(orderId);
log.info("Deleted order document: {}", orderId);
return;
}
// For 'c' or 'u' on orders table, we rebuild the whole document
OrderDocument doc = readModelBuilder.buildFromOrderId(UUID.fromString(orderId));
if (doc != null) {
orderDocumentRepository.save(doc);
log.info("Indexed order document: {}", orderId);
} else {
// 可能由于数据延迟,关联数据还未就绪,需要处理这种情况
log.warn("Could not build document for order {}, maybe related data is not available yet.", orderId);
}
}
public void handleOrderLineEvent(char op, JsonNode orderLineData) {
// 这里的关键:无论order_line发生什么变化,我们都以order为单位进行重建
String orderId = orderLineData.get("order_id").asText();
// Even for 'd', we just re-index the parent order.
// The builder will fetch current state and build the document.
OrderDocument doc = readModelBuilder.buildFromOrderId(UUID.fromString(orderId));
if (doc != null) {
orderDocumentRepository.save(doc);
log.info("Re-indexed order document {} due to order_line change.", orderId);
} else {
// It's possible the parent order was deleted.
// Check if the document exists and delete if necessary.
if (!orderDocumentRepository.existsById(orderId)) {
log.info("Parent order {} for order_line change does not exist. No action needed.", orderId);
} else {
orderDocumentRepository.deleteById(orderId);
log.warn("Parent order {} document was found but could not be rebuilt. Deleting for safety.", orderId);
}
}
}
}
OrderReadModelBuilder
会直接查询主数据库(PostgreSQL)来获取一个Order
聚合根的完整、最新状态,然后将其转换为OrderDocument
。这是一个常见的模式,确保了读模型的最终一致性。虽然这会给主库带来一些额外的读压力,但查询是基于主键的,性能很高,并且完全避免了在投影服务中维护复杂状态的麻烦。
// file: com/mycorp/projection/OrderReadModelBuilder.java
@Service
public class OrderReadModelBuilder {
// Inject JPA Repository to read from the master DB
private final OrderRepository orderRepository;
public OrderDocument buildFromOrderId(UUID orderId) {
// Use an Optional to handle case where order might have been deleted
Optional<Order> orderOpt = orderRepository.findById(orderId);
if (orderOpt.isEmpty()) {
return null;
}
Order order = orderOpt.get();
return mapToDocument(order);
}
// Mapper logic from JPA entity to OpenSearch document
private OrderDocument mapToDocument(Order order) {
OrderDocument doc = new OrderDocument();
doc.setId(order.getId().toString());
doc.setCustomerId(order.getCustomerId());
// ... mapping all other fields
List<OrderLineDoc> lineDocs = order.getOrderLines().stream()
.map(this::mapOrderLine)
.collect(Collectors.toList());
doc.setOrderLines(lineDocs);
// Build denormalized search fields
doc.setProductIds(order.getOrderLines().stream().map(OrderLine::getProductId).collect(Collectors.toList()));
doc.setProductNameSearch(order.getOrderLines().stream().map(OrderLine::getProductName).collect(Collectors.joining(" ")));
return doc;
}
private OrderLineDoc mapOrderLine(OrderLine line) {
// ... mapping logic
}
}
5. OpenSearch索引配置
为了让productNameSearch
和orderLines
等字段能按预期工作,我们需要一个自定义的索引映射。
// opensearch_order_mapping.json
PUT /orders_read_model
{
"mappings": {
"properties": {
"id": { "type": "keyword" },
"customerId": { "type": "long" },
"status": { "type": "keyword" },
"totalPrice": { "type": "double" },
"createdAt": { "type": "date" },
"shippingAddress": {
"type": "object",
"properties": {
"province": { "type": "keyword" },
"city": { "type": "keyword" }
}
},
"productIds": { "type": "keyword" },
"productNameSearch": { "type": "text", "analyzer": "standard" },
"orderLines": {
"type": "nested",
"properties": {
"productId": { "type": "keyword" },
"productName": { "type": "text" },
"quantity": { "type": "integer" },
"unitPrice": { "type": "double" }
}
}
}
}
}
使用nested
类型而不是object
来处理orderLines
至关重要,它能让我们对订单行内的多个字段进行独立且正确的查询。
最终成果:一个解耦且高效的读写架构
至此,我们有了一个完整的、自动化的数据管道。
graph TD subgraph "Write Path" A[Client/API] -- Command --> B(Order Service); B -- Uses --> C{DDD Aggregate Root}; C -- Persisted by ORM --> D[(PostgreSQL)]; end subgraph "CDC Pipeline" D -- WAL --> E[Debezium Connector]; E -- CDC Events --> F[(Kafka)]; end subgraph "Read Model Projection" G[Projection Service] -- Consumes --> F; G -- Rebuilds Model by Reading --> D; G -- Indexes Document --> H[(OpenSearch)]; end subgraph "Read Path" I[Client/API] -- Complex Query --> J(Query Service); J -- Searches --> H; end
现在,运营部门的复杂查询请求会打到独立的查询服务,该服务直接查询OpenSearch。例如,前面提到的那个复杂查询,在OpenSearch中就变成了一个简单的bool query:
POST /orders_read_model/_search
{
"query": {
"bool": {
"must": [
{ "term": { "status": "PENDING" } },
{ "term": { "shippingAddress.province": "江苏省" } }, // Example for "华东仓库"
{ "nested": {
"path": "orderLines",
"query": {
"match": { "orderLines.productName": "A品牌手机" }
}
}}
],
"filter": [
{ "range": { "createdAt": { "gte": "now-30d/d" } } }
// Customer tag query would likely involve another lookup or denormalization
]
}
}
}
这个查询的性能与PostgreSQL上的JOIN
查询相比,有数量级的提升,并且完全不会影响到核心的写操作。
局限性与未来迭代路径
这个架构并非银弹,它引入了自身的复杂性,在真实项目中还有几个问题需要考虑:
最终一致性延迟: 从数据写入PostgreSQL到在OpenSearch中可查,存在一个毫秒到秒级的延迟。对于绝大多数后台查询场景,这是完全可以接受的。但对于需要强一致性读的场景(例如,用户刚下完单就立即查看订单详情),最佳实践是直接从主库读取。
Schema演进: 如果
orders
表的结构发生变化(比如增加一个字段),需要一套协调好的流程:先更新投影服务的代码使其能处理新字段,部署服务,然后才执行数据库的ALTER TABLE
。否则,CDC事件可能会导致投影服务解析失败。重新快照 (Re-snapshotting): 如果投影服务的逻辑有重大变更,或者数据出现不一致需要修复,我们可能需要重新同步所有历史数据。Debezium支持触发一次性的快照,但这会对生产数据库造成额外的负载。一个更优化的方式是开发离线的数据同步脚本,绕开Debezium,直接从数据库批量读取并写入OpenSearch。
投影服务的健壮性: 当前的实现中,错误处理还很简陋。生产级的投影服务必须包含完善的重试机制、死信队列(DLQ),以及对消息处理的幂等性保证,确保即使Kafka消息重复投递,OpenSearch中的数据也是正确的。