构建基于CircleCI与Playwright的Saga模式自动化韧性测试管道


一个跨多个微服务的订单处理流程必须保证数据一致性。当支付服务成功扣款,但库存服务因网络抖动而更新失败时,系统绝不能陷入一个“钱已付、货未扣”的中间状态。这是一个典型的分布式事务场景,也是无数系统稳定性的试金石。

定义挑战:云原生环境下的事务一致性与可测试性

在微服务架构中,服务边界清晰,技术栈异构,数据库私有。传统的数据库ACID事务模型在此失效。我们需要一种机制来协调跨越多个独立服务的操作序列,确保它们要么全部成功,要么全部“撤销”,以维持业务流程的原子性。

更严峻的挑战在于,如何验证这套机制在真实世界的混乱——服务崩溃、网络分区、消息延迟——中依然健壮?手动测试覆盖有限且效率低下。我们需要一个自动化的、能在CI/CD流程中持续运行的韧性测试框架,它能主动注入故障,并断言系统最终能恢复到一致状态。这不仅是功能测试,更是对系统韧性的量化度量。

方案A的陷阱:两阶段提交(2PC)的脆弱性

两阶段提交(Two-Phase Commit, 2PC)是实现分布式事务原子性的经典算法。它引入一个事务协调器(Coordinator)来管理所有参与者(Participants)。

sequenceDiagram
    participant Client
    participant Coordinator
    participant ParticipantA
    participant ParticipantB

    Client->>Coordinator: Begin Transaction
    Coordinator->>ParticipantA: Prepare
    Coordinator->>ParticipantB: Prepare

    Note right of Coordinator: 第一阶段:投票

    ParticipantA-->>Coordinator: Vote Yes
    ParticipantB-->>Coordinator: Vote Yes

    Note right of Coordinator: 第二阶段:提交

    Coordinator->>ParticipantA: Commit
    Coordinator->>ParticipantB: Commit

    ParticipantA-->>Coordinator: Ack
    ParticipantB-->>Coordinator: Ack
    Coordinator-->>Client: Transaction Succeeded

优势分析:
2PC最大的吸引力在于它提供了强一致性(Strict Consistency)。在第二阶段的Commit指令发出前,任何参与者都不会真正提交事务。这保证了所有参与者状态的同步变更,对于需要严格ACID的场景(例如金融交易的核心清算)至关重要。

劣势与不适用性分析:
在现代基于消息队列和事件驱动的云原生应用中,2PC的弊端被急剧放大。

  1. 同步阻塞: 在整个事务期间,从第一阶段的Prepare开始,所有参与者都必须锁定资源,等待协调器的最终指令。如果一个事务涉及多个服务,且耗时较长,这种长时间的资源锁定会严重拖累系统吞吐量,降低可用性。
  2. 协调器单点故障: 协调器是整个系统的中枢。如果协调器在发出Commit指令后、所有参与者确认前宕机,系统状态将变得不确定。部分参与者可能已经提交,而另一部分则永远等待一个不会到来的指令。
  3. 网络分区敏感: 2PC对网络可靠性要求极高。在Prepare和Commit阶段,任何一次协调器与参与者之间的通信失败都可能导致整个事务超时失败。这与云环境中网络瞬时抖动是常态的现实格格不入。

对于我们基于Azure Service Bus的异步通信模型,采用2PC意味着强行将异步流程扭转为同步阻塞,这完全违背了架构设计的初衷。在真实项目中,这种方案通常在设计评审阶段就会被否决。

方案B的选择:拥抱最终一致性的Saga模式

Saga模式是一种通过异步消息驱动的、管理分布式事务的模式。它将一个长事务拆分为一系列本地事务,每个本地事务由一个服务完成。当一个本地事务成功后,它会发布一个事件,触发下一个本地事务。如果任何一个步骤失败,Saga会执行一系列补偿事务,来撤销之前已成功提交的本地事务。

我们采用基于编排(Choreography)的Saga,服务间通过订阅/发布Azure Service Bus上的事件来协作。

sequenceDiagram
    participant OrderSvc
    participant PaymentSvc
    participant InventorySvc
    participant AzureServiceBus

    OrderSvc->>AzureServiceBus: Publish(OrderCreated)
    AzureServiceBus-->>PaymentSvc: Consume(OrderCreated)
    PaymentSvc->>PaymentSvc: Process Payment (Local Tx)
    alt Payment Succeeded
        PaymentSvc->>AzureServiceBus: Publish(PaymentSucceeded)
        AzureServiceBus-->>InventorySvc: Consume(PaymentSucceeded)
        InventorySvc->>InventorySvc: Reserve Inventory (Local Tx)
        alt Inventory Reserved
            InventorySvc->>AzureServiceBus: Publish(InventoryReserved)
            AzureServiceBus-->>OrderSvc: Consume(InventoryReserved)
            OrderSvc->>OrderSvc: Mark Order as Completed
        else Inventory Out of Stock
            InventorySvc->>AzureServiceBus: Publish(InventoryReservationFailed)
            AzureServiceBus-->>PaymentSvc: Consume(InventoryReservationFailed)
            PaymentSvc->>PaymentSvc: Refund Payment (Compensating Tx)
            PaymentSvc->>AzureServiceBus: Publish(PaymentRefunded)
            AzureServiceBus-->>OrderSvc: Consume(PaymentRefunded)
            OrderSvc->>OrderSvc: Mark Order as Failed
        end
    else Payment Failed
        PaymentSvc->>AzureServiceBus: Publish(PaymentFailed)
        AzureServiceBus-->>OrderSvc: Consume(PaymentFailed)
        OrderSvc->>OrderSvc: Mark Order as Failed
    end

优势分析:

  1. 高可用与松耦合: 每个服务只执行本地事务,不需等待其他服务。这避免了跨服务资源锁定,极大提升了系统吞t u量和可用性。服务之间通过事件解耦,可以独立部署和扩展。
  2. 容错性: 补偿事务机制使得系统具备了自我修复的能力。即使某个步骤失败,整个业务流程也能通过逆向操作回退到一致状态。
  3. 技术契合度: Saga模式与事件驱动架构、消息队列(如Azure Service Bus)是天作之合。

劣势与权衡:

  1. 最终一致性: 在补偿事务完成前,系统会处于中间状态。这要求业务能够容忍短暂的数据不一致。
  2. 实现复杂性: 开发者必须为每个可撤销的操作精心设计和实现补偿逻辑,这增加了开发和测试的复杂性。
  3. 可观测性差: 跟踪一个跨越多个服务的Saga实例的状态变得困难。没有集中的协调器,需要依赖分布式追踪和日志关联来排查问题。

最终决策:
对于典型的电商订单场景,可用性和系统吞吐量远比强一致性重要。用户可以接受订单在几秒钟内最终确认或失败。因此,我们选择Saga模式,并着力解决其最大的痛点——通过构建自动化韧性测试管道来保证其复杂逻辑的正确性。

核心实现:Saga与自动化测试管道

我们将使用Node.js和TypeScript构建三个微服务:OrderService, PaymentService, InventoryService

1. 基础设施与通信层 (Azure Service Bus)

我们定义一个通用的事件发布和订阅客户端。这里的关键是使用correlation ID来追踪整个Saga流程。

// src/shared/messaging.ts
import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus";
import { v4 as uuidv4 } from "uuid";

// 在真实项目中,连接字符串应该来自环境变量
const connectionString = process.env.AZURE_SERVICE_BUS_CONNECTION_STRING!;
const sbClient = new ServiceBusClient(connectionString);

interface AppEvent {
  eventType: string;
  payload: any;
}

export async function publishEvent(topicName: string, event: AppEvent, correlationId: string) {
  const sender = sbClient.createSender(topicName);
  const message: ServiceBusMessage = {
    correlationId,
    contentType: "application/json",
    body: event,
    messageId: uuidv4(),
  };

  try {
    await sender.sendMessages(message);
    console.log(`[Bus] Published '${event.eventType}' to topic '${topicName}' with correlationId: ${correlationId}`);
  } finally {
    await sender.close();
  }
}

export function subscribeToTopic(topicName: string, subscriptionName: string, handlers: { [eventType: string]: (payload: any, correlationId: string) => Promise<void> }) {
  const receiver = sbClient.createReceiver(topicName, subscriptionName);

  receiver.subscribe({
    processMessage: async (message) => {
      const event = message.body as AppEvent;
      const handler = handlers[event.eventType];
      if (handler) {
        try {
          console.log(`[Bus] Received '${event.eventType}' from '${topicName}/${subscriptionName}' with correlationId: ${message.correlationId}`);
          await handler(event.payload, message.correlationId as string);
          await receiver.completeMessage(message); // 消息处理成功
        } catch (error) {
          console.error(`[Bus] Error processing message ${message.messageId}:`, error);
          // 错误处理,可以考虑死信队列
          await receiver.deadLetterMessage(message, { deadLetterReason: "HandlerError", deadLetterErrorDescription: error.message });
        }
      } else {
         console.warn(`[Bus] No handler for event type '${event.eventType}'. Abandoning message.`);
         await receiver.abandonMessage(message); // 没有处理器,放弃消息
      }
    },
    processError: async (args) => {
      console.error(`[Bus] Error from subscription '${subscriptionName}':`, args.error);
    },
  });

  console.log(`[Bus] Subscribed to '${topicName}/${subscriptionName}'`);
}

2. 服务实现:订单、支付与库存

PaymentService为例,它订阅OrderCreated事件,处理支付,然后发布PaymentSucceededPaymentFailed事件。同时,它还需要订阅用于补偿的事件,如InventoryReservationFailed

// src/payment-service/index.ts
import { subscribeToTopic, publishEvent } from '../shared/messaging';

// 模拟数据库
const payments = new Map<string, any>();

async function handleOrderCreated(payload: any, correlationId: string) {
    console.log(`[PaymentSvc] Processing payment for order ${payload.orderId}...`);
    // 模拟支付逻辑,这里有50%概率失败
    if (Math.random() > 0.5) {
        payments.set(payload.orderId, { status: 'SUCCEEDED', amount: payload.amount });
        console.log(`[PaymentSvc] Payment for order ${payload.orderId} SUCCEEDED.`);
        await publishEvent('payment-events', {
            eventType: 'PaymentSucceeded',
            payload: { orderId: payload.orderId, amount: payload.amount }
        }, correlationId);
    } else {
        payments.set(payload.orderId, { status: 'FAILED' });
        console.error(`[PaymentSvc] Payment for order ${payload.orderId} FAILED.`);
        await publishEvent('payment-events', {
            eventType: 'PaymentFailed',
            payload: { orderId: payload.orderId, reason: 'Insufficient funds' }
        }, correlationId);
    }
}

async function handleInventoryReservationFailed(payload: any, correlationId: string) {
    // 补偿事务:退款
    console.log(`[PaymentSvc] Compensating: Refunding payment for order ${payload.orderId}...`);
    const payment = payments.get(payload.orderId);
    if (payment && payment.status === 'SUCCEEDED') {
        payment.status = 'REFUNDED';
        console.log(`[PaymentSvc] Refund for order ${payload.orderId} completed.`);
        await publishEvent('payment-events', {
            eventType: 'PaymentRefunded',
            payload: { orderId: payload.orderId }
        }, correlationId);
    } else {
        console.warn(`[PaymentSvc] No successful payment found to refund for order ${payload.orderId}.`);
    }
}

function start() {
    subscribeToTopic('order-events', 'payment-service-subscription', {
        'OrderCreated': handleOrderCreated
    });
    subscribeToTopic('inventory-events', 'payment-service-subscription-for-compensation', {
        'InventoryReservationFailed': handleInventoryReservationFailed
    });
    console.log('[PaymentSvc] Service started.');
}

start();

OrderServiceInventoryService的实现逻辑类似,它们各自处理自己的业务和补偿逻辑。

3. 自动化韧性测试管道 (CircleCI & Playwright)

这是整个方案的皇冠。我们使用Playwright不只是为了测试UI,而是作为一个强大的端到端流程编排和断言工具。CircleCI负责执行这个测试流程,包括关键的故障注入步骤。

.circleci/config.yml

version: 2.1

orbs:
  node: circleci/[email protected]

# 使用 Docker Compose 来管理我们的微服务
executors:
  docker-compose-executor:
    docker:
      - image: cimg/base:stable
    environment:
      COMPOSE_PROJECT_NAME: saga-resilience-test
    resource_class: medium

commands:
  # 启动所有服务
  start_services:
    steps:
      - checkout
      - setup_remote_docker:
          version: 20.10.14
      - run:
          name: Install Docker Compose
          command: |
            sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
            sudo chmod +x /usr/local/bin/docker-compose
      - run:
          name: Set environment variables
          # 在CircleCI项目设置中配置 AZURE_SERVICE_BUS_CONNECTION_STRING
          command: echo "export AZURE_SERVICE_BUS_CONNECTION_STRING='${AZURE_SERVICE_BUS_CONNECTION_STRING}'" >> $BASH_ENV
      - run:
          name: Build and run services in background
          command: |
            docker-compose up --build -d
            echo "Waiting for services to be healthy..."
            sleep 20 # 在真实项目中,应该使用健康检查脚本

  # 注入故障的命令
  inject_failure:
    parameters:
      service_name:
        type: string
    steps:
      - run:
          name: Inject failure by stopping << parameters.service_name >>
          command: |
            echo "Injecting failure: Stopping service << parameters.service_name >>..."
            docker-compose stop << parameters.service_name >>
            echo "Service << parameters.service_name >> stopped."

jobs:
  # 作业1:测试Saga成功路径
  test_happy_path:
    executor: docker-compose-executor
    steps:
      - start_services
      - node/install-packages:
          pkg-manager: npm
          app-dir: ~/project/e2e-tests # Playwright测试代码目录
      - run:
          name: Run Playwright Happy Path Test
          command: |
            cd e2e-tests
            npx playwright test happy-path.spec.ts

  # 作业2:测试支付服务失败后的补偿逻辑
  test_inventory_compensation:
    executor: docker-compose-executor
    steps:
      - start_services
      - node/install-packages:
          pkg-manager: npm
          app-dir: ~/project/e2e-tests
      - run:
          name: Run Playwright Inventory Compensation Test
          command: |
            cd e2e-tests
            # 我们通过环境变量来协调测试脚本和故障注入
            export TARGET_FAILURE_SERVICE="inventory-service"
            npx playwright test compensation.spec.ts
      # 这里的故障注入是在 Playwright 测试脚本内部通过一个特定API触发后进行的
      # 为了简化配置,我们直接在这里注入。更高级的方式是在测试中调用一个API来触发故障。
      # 注意:这里的时序控制非常重要
      - run:
          name: Let test trigger the action, then inject failure
          command: |
            echo "Waiting for test to signal failure injection..."
            # Playwright 测试会创建一个文件或调用一个 webhook 来触发这一步
            # 这里简化为sleep
            sleep 10
            docker-compose stop inventory-service
            echo "Inventory service stopped. Test will now verify compensation."


workflows:
  resilience_testing_pipeline:
    jobs:
      - test_happy_path
      - test_inventory_compensation:
          requires:
            - test_happy_path

注意: 上述CircleCI配置为了演示,简化了故障注入的时序控制。在实际项目中,Playwright测试脚本会在启动订单后,通过一个专门的“故障注入服务”API或exec命令来精确地停止容器,以确保故障发生在Saga流程的中间阶段。

e2e-tests/compensation.spec.ts

import { test, expect } from '@playwright/test';
import axios from 'axios';
import { v4 as uuidv4 } from 'uuid';

const ORDER_SERVICE_API = 'http://localhost:3000'; // 假设OrderService暴露了API

// 辅助函数,用于轮询订单状态
async function pollOrderStatus(orderId: string, expectedStatus: string, timeout = 30000) {
  const startTime = Date.now();
  while (Date.now() - startTime < timeout) {
    try {
      const response = await axios.get(`${ORDER_SERVICE_API}/orders/${orderId}`);
      if (response.data.status === expectedStatus) {
        return true;
      }
    } catch (error) {
      // 忽略查询过程中的错误
    }
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
  throw new Error(`Order ${orderId} did not reach status ${expectedStatus} within ${timeout}ms`);
}


test.describe('Saga Compensation Tests', () => {
  
  test('should compensate and fail order when inventory service dies after payment', async () => {
    // 步骤 1: 启动一个新的订单
    const orderId = uuidv4();
    const createOrderResponse = await axios.post(`${ORDER_SERVICE_API}/orders`, {
      orderId,
      productId: 'prod-123',
      quantity: 1,
      amount: 99.99
    });
    expect(createOrderResponse.status).toBe(201);
    console.log(`Test: Order ${orderId} created.`);

    // 步骤 2: 等待支付成功
    // 在真实场景中,我们可能需要订阅一个通知或轮询支付状态。
    // 为了让测试稳定,我们需要一种方法知道支付事件已发出。
    // 这里我们假设支付很快,并等待一个固定的时间让消息被处理。
    console.log('Test: Waiting for payment to be processed...');
    await new Promise(resolve => setTimeout(resolve, 5000)); 

    // 步骤 3: 注入故障
    // CircleCI的下一步将会执行 `docker-compose stop inventory-service`
    // 在本地测试时,需要手动执行该命令,或在测试脚本中用 `exec` 执行。
    console.log('Test: Now is the time to inject failure by stopping inventory-service.');
    // 假设CircleCI会在此时停止inventory-service

    // 步骤 4: 验证最终状态
    // 由于库存服务宕机,InventoryReserved事件永远不会发出。
    // 经过一段时间(可能是超时或重试失败后),应该触发补偿流程。
    // 支付服务应该收到一个补偿事件(或者订单服务超时后主动发起补偿)。
    // 最终,订单状态应该被标记为FAILED。
    console.log('Test: Verifying final order status is FAILED due to compensation...');
    await expect(pollOrderStatus(orderId, 'FAILED')).resolves.toBe(true);
    console.log(`Test: Successfully verified that order ${orderId} is FAILED.`);
  });
});

这个Playwright测试脚本的核心在于:它不关心UI,而是通过API与系统交互,模拟一个完整的业务流程。它的断言目标是系统的最终一致状态,即使在核心服务被强制下线后,系统也能通过Saga的补偿逻辑恢复到一个已知的、正确的状态(订单失败)。

架构的局限性与未来展望

当前这套基于事件编排的Saga韧性测试方案已经相当强大,但仍有其局限性。

首先,故障注入的粒度较粗。我们模拟的是整个服务崩溃,但无法精细地模拟网络延迟、消息重复或消息乱序等更微妙的故障模式。要实现这些,需要引入更专业的混沌工程工具,如Toxiproxy,或在Kubernetes环境中使用Chaos Mesh,并将它们的控制API集成到CI流程中。

其次,最终一致性使得测试断言变得复杂。pollOrderStatus这种轮询加超时的方式虽然有效,但会增加测试执行时间,且在负载较高时可能因超时而产生误报。一个改进方向是让服务在完成关键状态变更时(如订单失败)能通过Webhook或WebSocket通知测试客户端,变轮询为推送。

最后,随着Saga流程变得更长、分支更多,其状态追踪和调试的难度会呈指数级增长。虽然我们通过correlationId关联了日志和消息,但依然缺乏一个全局视图。未来的迭代可以考虑引入一个轻量级的Saga协调器(Orchestrator),它不参与业务逻辑,仅负责追踪Saga实例的状态机、处理超时和重试,从而将Saga模式从纯粹的编排(Choreography)向更易于管理的编排(Orchestration)模式演进,以换取更佳的可观测性。


  目录