Sagas模式是一种在分布式系统中管理长时间运行事务的设计模式。它将一个大型事务拆分为一系列较小的本地事务,每个本地事务更新数据库并发布消息或事件来触发下一个本地事务。如果某个本地事务失败,Sagas会执行补偿事务(Compensating Transactions)来撤销之前已完成的本地事务,从而保证系统最终一致性。
在单体应用中,ACID事务(原子性、一致性、隔离性、持久性)由数据库保证,开发者只需使用BEGIN、COMMIT、ROLLBACK即可。但在分布式系统中,尤其是微服务架构下,一个业务操作可能涉及多个服务的多个数据库,传统的两阶段提交(2PC)存在严重问题:
Sagas模式通过放弃强一致性,追求最终一致性,解决了上述问题。它允许每个服务独立提交本地事务,通过补偿机制处理失败场景。
一个Saga由以下要素组成:
Sagas放弃了ACID中的隔离性(Isolation),保留了:
这种权衡是分布式系统的核心设计哲学:在可接受的业务场景下,用最终一致性换取可用性和分区容错性(AP in CAP theorem)。
编排式Saga由一个中央协调器(Saga Orchestrator)控制整个流程。协调器向各个服务发送命令,等待响应后决定下一步操作。
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Order Service│ │ Payment Service│ │ Inventory Service│
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌────────▼────────┐
│ Saga Orchestrator│
└─────────────────┘
如果在步骤5库存服务失败:
public class OrderSagaOrchestrator {
public void executeOrderSaga(OrderRequest request) {
SagaExecutionState state = new SagaExecutionState();
try {
// Step 1: Create Order
Order order = orderService.createOrder(request);
state.setOrderId(order.getId());
// Step 2: Process Payment
Payment payment = paymentService.charge(order);
state.setPaymentId(payment.getId());
// Step 3: Reserve Inventory
inventoryService.reserve(order);
// Step 4: Ship Order
shippingService.createShipment(order);
sagaRepository.markCompleted(state.getSagaId());
} catch (Exception e) {
// Compensate in reverse order
compensate(state);
}
}
private void compensate(SagaExecutionState state) {
if (state.getShipmentId() != null) {
shippingService.cancelShipment(state.getShipmentId());
}
if (state.getInventoryReserved()) {
inventoryService.releaseReservation(state.getOrderId());
}
if (state.getPaymentId() != null) {
paymentService.refund(state.getPaymentId());
}
if (state.getOrderId() != null) {
orderService.cancelOrder(state.getOrderId());
}
sagaRepository.markCompensated(state.getSagaId());
}
}
协同式Saga没有中央协调器,各服务通过监听事件来决定自己的行动。每个服务完成本地事务后发布事件,感兴趣的服务订阅这些事件并执行相应操作。
Order Service Payment Service Inventory Service Shipping Service
│ │ │ │
│ OrderCreated │ │ │
├────────────────────►│ │ │
│ │ PaymentProcessed │ │
│ ├────────────────────►│ │
│ │ │ InventoryReserved │
│ │ ├───────────────────►│
│ │ │ │ OrderShipped
│ │ │◄───────────────────┤
如果支付失败,支付服务发布"PaymentFailed"事件:
如果库存扣减失败,库存服务发布"InventoryReservationFailed"事件:
// Order Service
@EventListener
public class OrderEventHandler {
@Transactional
public void handleCreateOrderCommand(CreateOrderCommand cmd) {
Order order = orderRepository.save(new Order(cmd));
eventPublisher.publish(new OrderCreatedEvent(order.getId(), order.getAmount()));
}
@EventListener
public void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.getOrderId());
order.cancel();
orderRepository.save(order);
}
}
// Payment Service
@EventListener
public class PaymentEventHandler {
@EventListener
@Transactional
public void onOrderCreated(OrderCreatedEvent event) {
try {
Payment payment = paymentService.charge(event.getOrderId(), event.getAmount());
eventPublisher.publish(new PaymentProcessedEvent(event.getOrderId(), payment.getId()));
} catch (PaymentException e) {
eventPublisher.publish(new PaymentFailedEvent(event.getOrderId(), e.getMessage()));
}
}
@EventListener
@Transactional
public void onInventoryReservationFailed(InventoryReservationFailedEvent event) {
paymentService.refund(event.getOrderId());
eventPublisher.publish(new PaymentRefundedEvent(event.getOrderId()));
}
}
// Inventory Service
@EventListener
public class InventoryEventHandler {
@EventListener
@Transactional
public void onPaymentProcessed(PaymentProcessedEvent event) {
try {
inventoryService.reserve(event.getOrderId());
eventPublisher.publish(new InventoryReservedEvent(event.getOrderId()));
} catch (InsufficientInventoryException e) {
eventPublisher.publish(new InventoryReservationFailedEvent(event.getOrderId(), e.getMessage()));
}
}
}
| 特性 | 编排式(Orchestration) | 协同式(Choreography) |
|---|---|---|
| 控制方式 | 中央协调器 | 事件驱动,去中心化 |
| 耦合度 | 服务间通过协调器耦合 | 松耦合,仅依赖事件 |
| 可理解性 | 流程集中,易于理解 | 逻辑分散,需要工具辅助 |
| 单点故障 | 协调器是潜在风险 | 无单点故障 |
| 扩展性 | 受限于协调器容量 | 天然支持水平扩展 |
| 回滚复杂度 | 协调器统一处理 | 各服务自行处理补偿 |
| 适用场景 | 复杂流程、强顺序依赖 | 简单流程、高可用要求 |
实践中,许多系统采用混合模式:
补偿事务不是传统意义上的"回滚",因为本地事务已经提交,无法通过数据库ROLLBACK撤销。补偿事务是一个新的业务操作,其语义效果与原始操作相反。
例如:
补偿事务必须幂等,因为网络不可靠可能导致补偿命令重复执行。
public class PaymentService {
// 幂等退款实现
public void refund(String paymentId) {
Payment payment = paymentRepository.findById(paymentId);
// 检查是否已退款,避免重复操作
if (payment.getStatus() == PaymentStatus.REFUNDED) {
log.info("Payment {} already refunded, skipping", paymentId);
return;
}
// 执行退款
paymentGateway.refund(payment.getTransactionId());
payment.setStatus(PaymentStatus.REFUNDED);
paymentRepository.save(payment);
}
}
补偿事务可能因临时故障失败,需要支持重试。
@Retryable(
value = {TransientException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void compensateInventory(String orderId) {
inventoryService.releaseReservation(orderId);
}
补偿必须按相反顺序执行。如果原始顺序是A→B→C,补偿顺序必须是C→B→A。
private void compensateInReverseOrder(SagaState state) {
// 假设执行顺序: createOrder → chargePayment → reserveInventory → shipOrder
// 补偿顺序必须反向
List<CompensatingAction> compensations = new ArrayList<>();
if (state.isOrderShipped()) {
compensations.add(() -> shippingService.cancelShipment(state.getShipmentId()));
}
if (state.isInventoryReserved()) {
compensations.add(() -> inventoryService.releaseReservation(state.getOrderId()));
}
if (state.isPaymentCharged()) {
compensations.add(() -> paymentService.refund(state.getPaymentId()));
}
if (state.isOrderCreated()) {
compensations.add(() -> orderService.cancelOrder(state.getOrderId()));
}
// 按添加顺序执行(即反向)
for (CompensatingAction action : compensations) {
action.execute();
}
}
补偿事务本身也可能失败,需要持续重试直到成功,或进入人工干预流程。
public class SagaCompensationMonitor {
@Scheduled(fixedRate = 60000) // 每分钟检查
public void retryFailedCompensations() {
List<FailedCompensation> failed = compensationRepository.findByStatus(FAILED);
for (FailedCompensation fc : failed) {
try {
executeCompensation(fc);
compensationRepository.markResolved(fc.getId());
} catch (Exception e) {
if (fc.getRetryCount() > MAX_RETRIES) {
alertOperationsTeam(fc); // 人工介入
} else {
compensationRepository.incrementRetryCount(fc.getId());
}
}
}
}
}
某些操作本质上是不可逆的,设计Saga时必须识别这些操作:
应对策略:
Saga是长时间运行的事务(可能持续数秒到数天),期间可能经历:
状态管理确保Saga在这些情况下仍能正确完成或补偿。
Saga可以建模为状态机:
┌─────────────┐
┌─────────│ STARTED │◄────────┐
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ IN_PROGRESS │ │
│ └──────┬──────┘ │
│ │ │
┌────┴────┐ │ ┌─────┴─────┐
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌─────────┐ ┌────────┐ ┌──────────┐
│FAILED │ │COMPENSATING│ │COMPLETED│ │ABORTED │ │SUSPENDED │
└───┬───┘ └────┬───┘ └─────────┘ └────────┘ └──────────┘
│ │
│ ▼
│ ┌───────────┐
└────►│COMPENSATED│
└───────────┘
Saga状态必须持久化到数据库,推荐独立的状态表:
CREATE TABLE saga_instances (
id VARCHAR(36) PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
current_state VARCHAR(50) NOT NULL,
payload JSON, -- Saga输入参数
step_results JSON, -- 各步骤执行结果
compensation_log JSON, -- 补偿执行记录
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL,
version INT NOT NULL DEFAULT 0, -- 乐观锁
INDEX idx_state (current_state),
INDEX idx_started_at (started_at)
);
CREATE TABLE saga_steps (
id VARCHAR(36) PRIMARY KEY,
saga_id VARCHAR(36) NOT NULL,
step_name VARCHAR(100) NOT NULL,
step_order INT NOT NULL,
status VARCHAR(20) NOT NULL, -- PENDING, SUCCESS, FAILED, COMPENSATED
input_data JSON,
output_data JSON,
error_info JSON,
executed_at TIMESTAMP,
compensated_at TIMESTAMP,
FOREIGN KEY (saga_id) REFERENCES saga_instances(id),
INDEX idx_saga_step (saga_id, step_order)
);
当系统从故障中恢复时,需要扫描未完成的Saga:
@Component
public class SagaRecoveryService {
@PostConstruct
public void recoverIncompleteSagas() {
// 查找所有未完成的Saga
List<SagaInstance> incomplete = sagaRepository.findByStateIn(
Arrays.asList(SagaState.STARTED, SagaState.IN_PROGRESS, SagaState.COMPENSATING)
);
for (SagaInstance saga : incomplete) {
try {
resumeSaga(saga);
} catch (Exception e) {
log.error("Failed to resume saga {}", saga.getId(), e);
// 进入人工处理队列
alertOperationsTeam(saga);
}
}
}
private void resumeSaga(SagaInstance saga) {
SagaOrchestrator orchestrator = orchestratorFactory.get(saga.getSagaType());
switch (saga.getCurrentState()) {
case IN_PROGRESS:
// 从上次成功的步骤继续
orchestrator.continueFromLastStep(saga);
break;
case COMPENSATING:
// 继续补偿流程
orchestrator.resumeCompensation(saga);
break;
default:
log.warn("Unexpected saga state: {}", saga.getCurrentState());
}
}
}
Saga依赖消息传递协调步骤,消息系统必须保证:
由于至少一次投递,消费者必须幂等:
@Service
public class OrderEventHandler {
@Transactional
public void handleOrderCreated(OrderCreatedEvent event) {
// 使用事件ID去重
if (processedEventRepository.existsByEventId(event.getEventId())) {
log.info("Event {} already processed, skipping", event.getEventId());
return;
}
// 处理事件
processOrderCreation(event);
// 记录已处理
processedEventRepository.save(new ProcessedEvent(event.getEventId()));
}
}
对于编排式Saga,协调器可以按顺序发送命令。对于协同式Saga,需要确保消费者按顺序处理:
// 使用分区保证顺序
@KafkaListener(topics = "order-events", groupId = "payment-service",
concurrency = "3") // 3个并发消费者
public void handleOrderEvent(OrderEvent event) {
// Kafka保证同一分区内的消息顺序
// 使用orderId作为分区键,确保同一订单的事件顺序处理
}
// 生产时指定分区键
kafkaTemplate.send("order-events", event.getOrderId(), event);
@KafkaListener(topics = "saga-commands")
public void handleSagaCommand(SagaCommand command) {
try {
executeCommand(command);
} catch (RetryableException e) {
// 重试,延迟投递到死信队列
throw e; // 让Kafka自动重试
} catch (NonRetryableException e) {
// 直接送入死信队列
deadLetterQueue.send(command, e);
}
}
创建订单 → 扣减库存 → 处理支付 → 创建物流单 → 发送通知
│ │ │ │ │
│ │ │ │ └─ 不可逆(已发送)
│ │ │ └─ 可补偿(取消物流单)
│ │ └─ 可补偿(退款)
│ └─ 可补偿(释放库存)
└─ 可补偿(取消订单)
设计要点:
冻结转出金额 → 扣减转出账户 → 增加转入账户 → 解冻金额 → 发送通知
设计要点:
预订航班 → 预订酒店 → 预订租车 → 购买保险 → 发送确认邮件
设计要点:
创建新表 → 双写数据 → 校验一致性 → 切换读流量 → 停止双写 → 删除旧表
设计要点:
Axon提供完整的CQRS/Event Sourcing/Saga支持:
@SagaEventHandler(associationProperty = "orderId")
public class OrderSaga {
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderCreatedEvent event) {
SagaLifecycle.associateWith("payment", event.getPaymentId());
commandGateway.send(new ProcessPaymentCommand(event.getPaymentId()));
}
@SagaEventHandler(associationProperty = "payment", keyName = "paymentId")
public void on(PaymentProcessedEvent event) {
commandGateway.send(new ReserveInventoryCommand(event.getOrderId()));
}
@EndSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderShippedEvent event) {
// Saga完成
}
@SagaEventHandler(associationProperty = "orderId")
public void on(PaymentFailedEvent event) {
// 触发补偿
commandGateway.send(new CancelOrderCommand(event.getOrderId()));
}
}
特点:
Seata是阿里巴巴开源的分布式事务解决方案,支持Saga模式:
// 状态机定义(JSON)
{
"startState": "CreateOrder",
"states": {
"CreateOrder": {
"type": "service",
"serviceName": "orderService.create",
"next": "DeductInventory",
"catch": [{ "error": "*", "next": "CompensateOrder" }]
},
"DeductInventory": {
"type": "service",
"serviceName": "inventoryService.deduct",
"next": "ProcessPayment",
"catch": [{ "error": "*", "next": "CompensateInventory" }]
},
"ProcessPayment": {
"type": "service",
"serviceName": "paymentService.process",
"next": "Succeed",
"catch": [{ "error": "*", "next": "CompensatePayment" }]
},
"CompensatePayment": {
"type": "service",
"serviceName": "paymentService.refund",
"next": "CompensateInventory"
},
"CompensateInventory": {
"type": "service",
"serviceName": "inventoryService.release",
"next": "CompensateOrder"
},
"CompensateOrder": {
"type": "service",
"serviceName": "orderService.cancel",
"next": "Fail"
},
"Succeed": { "type": "succeed" },
"Fail": { "type": "fail" }
}
}
特点:
Camunda是工作流引擎,也可用于Saga编排:
// BPMN流程定义
public class OrderSagaProcess {
@Autowired
private RuntimeService runtimeService;
public void startOrderSaga(OrderRequest request) {
Map<String, Object> variables = new HashMap<>();
variables.put("orderId", request.getOrderId());
variables.put("amount", request.getAmount());
runtimeService.startProcessInstanceByKey("orderSaga", variables);
}
}
// BPMN中定义补偿边界事件
// <boundaryEvent id="compensatePayment" attachedToRef="processPayment">
// <compensateEventDefinition />
// </boundaryEvent>
特点:
public class OrderSaga : MassTransitStateMachine<OrderState>
{
public State Created { get; private set; }
public State InventoryReserved { get; private set; }
public State Paid { get; private set; }
public Event<OrderCreated> OrderCreated { get; private set; }
public Event<InventoryReserved> InventoryReserved { get; private set; }
public Event<PaymentProcessed> PaymentProcessed { get; private set; }
public OrderSaga()
{
InstanceState(x => x.CurrentState);
Initially(
When(OrderCreated)
.Then(context => context.Instance.OrderId = context.Data.OrderId)
.Publish(context => new ReserveInventoryCommand(context.Data.OrderId))
.TransitionTo(Created));
During(Created,
When(InventoryReserved)
.Publish(context => new ProcessPaymentCommand(context.Instance.OrderId))
.TransitionTo(InventoryReserved));
During(InventoryReserved,
When(PaymentProcessed)
.Then(context => context.Instance.Completed = DateTime.UtcNow)
.Finalize());
}
}
Temporal是新兴的工作流平台,支持多语言:
func OrderSaga(ctx workflow.Context, order Order) error {
// 设置重试策略
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 执行步骤
var orderID string
err := workflow.ExecuteActivity(ctx, CreateOrder, order).Get(ctx, &orderID)
if err != nil {
return err // 第一步失败,无需补偿
}
// 注册补偿函数
compensation := NewCompensationStack()
compensation.Push(func() error {
return workflow.ExecuteActivity(ctx, CancelOrder, orderID).Get(ctx, nil)
})
var paymentID string
err = workflow.ExecuteActivity(ctx, ProcessPayment, order).Get(ctx, &paymentID)
if err != nil {
compensation.Compensate(ctx) // 执行补偿
return err
}
compensation.Push(func() error {
return workflow.ExecuteActivity(ctx, RefundPayment, paymentID).Get(ctx, nil)
})
err = workflow.ExecuteActivity(ctx, ReserveInventory, order).Get(ctx, nil)
if err != nil {
compensation.Compensate(ctx)
return err
}
return nil
}
特点:
问题:某些步骤没有设计补偿操作。
案例:发送优惠券后无法撤回。
解决:设计时强制要求每个步骤都有补偿方案,或标记为不可逆。
问题:补偿未按相反顺序执行。
案例:先释放了库存,但支付还未退款,导致超卖。
解决:使用栈结构管理补偿顺序。
问题:假设网络总是可靠的。
案例:补偿命令在网络分区期间丢失。
解决:使用可靠消息队列,保证至少一次投递。
问题:补偿执行期间,系统处于不一致状态。
案例:退款处理中,用户查询订单显示已支付但库存已释放。
解决:设计状态机,明确中间状态的业务含义。
问题:一个服务的故障导致大量Saga补偿。
案例:支付服务故障,所有进行中的订单都开始退款。
解决:
没有依赖关系的步骤可以并行:
创建订单
│
├─► 扣减库存 ──┐
│ │
├─► 验证地址 ──┼─► 处理支付 ──► 创建物流单 ──► 发送通知
│ │
└─► 检查风控 ──┘
补偿不需要实时完成,可以异步处理:
// 同步标记失败,异步执行补偿
public void failSagaAsync(SagaInstance saga) {
sagaRepository.markCompensating(saga.getId());
// 发送到补偿队列,异步处理
compensationQueue.send(new CompensationTask(saga.getId()));
}
相似Saga的补偿可以批量处理:
@Scheduled(fixedRate = 300000) // 每5分钟
public void batchCompensate() {
List<SagaInstance> toCompensate = sagaRepository
.findByStateAndCompensateBefore(SagaState.COMPENSATING, Instant.now());
// 按服务分组,批量调用
Map<String, List<String>> grouped = toCompensate.stream()
.collect(Collectors.groupingBy(
SagaInstance::getFailedService,
Collectors.mapping(SagaInstance::getId, Collectors.toList())
));
grouped.forEach((service, sagaIds) -> {
compensationClient.batchCompensate(service, sagaIds);
});
}
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| Saga成功率 | 成功完成的Saga占比 | < 95% |
| 平均完成时间 | Saga从启动到完成的耗时 | > P99阈值 |
| 补偿率 | 需要补偿的Saga占比 | > 5% |
| 补偿成功率 | 补偿操作成功占比 | < 99% |
| 挂起Saga数 | 长时间未完成的Saga | > 100 |
| 死信队列深度 | 无法处理的消息数 | > 1000 |
使用OpenTelemetry等工具追踪Saga执行:
@SagaEventHandler
public void handleOrderCreated(OrderCreatedEvent event) {
Span span = tracer.spanBuilder("saga.order.create")
.setParent(Context.current().with(Span.current()))
.setAttribute("saga.id", event.getSagaId())
.setAttribute("order.id", event.getOrderId())
.startSpan();
try (Scope scope = span.makeCurrent()) {
processOrderCreation(event);
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
# Prometheus告警规则
alerts:
- alert: SagaCompensationRateHigh
expr: rate(saga_compensations_total[5m]) / rate(saga_started_total[5m]) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "Saga补偿率过高"
- alert: SagaHanging
expr: saga_in_progress_duration_seconds > 3600
for: 0m
labels:
severity: critical
annotations:
summary: "Saga执行超时"
- alert: CompensationFailure
expr: rate(compensation_failures_total[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "补偿操作失败,需要人工介入"
| 特性 | 2PC | Sagas |
|---|---|---|
| 一致性 | 强一致性 | 最终一致性 |
| 性能 | 低(锁定资源) | 高(无全局锁) |
| 可用性 | 协调器故障时不可用 | 服务可用性不受影响 |
| 复杂度 | 协议复杂 | 业务逻辑复杂 |
| 适用场景 | 短事务、强一致要求 | 长事务、高可用要求 |
TCC是另一种分布式事务模式:
| 特性 | TCC | Sagas |
|---|---|---|
| 资源锁定 | Try阶段锁定 | 无锁定,直接执行 |
| 侵入性 | 需要业务改造(3个接口) | 补偿逻辑即可 |
| 一致性 | 更接近强一致 | 最终一致 |
| 复杂度 | 业务实现复杂 | 补偿设计复杂 |
| 适用场景 | 金融等高一致场景 | 一般业务场景 |
事件溯源将状态变化记录为事件序列,与Sagas天然契合:
但两者关注点不同:
Sagas模式是分布式系统中管理长时间运行事务的核心模式。它通过以下设计哲学解决分布式事务难题:
选择编排式还是协同式,取决于具体场景:
无论选择哪种方式,都必须关注:
Sagas模式不是银弹,它增加了系统的复杂性,但在微服务架构下,它是处理跨服务事务最实用的方案之一。正确理解和实施Sagas模式,是构建可靠分布式系统的关键能力。
参考资源: