Outbox模式(Outbox Pattern)是一种用于解决分布式系统中数据持久化与消息发送一致性问题的架构模式。在微服务架构中,服务通常需要同时完成两个操作:将数据写入数据库,以及向消息队列发送事件通知其他服务。这两个操作需要保持原子性——要么都成功,要么都失败,否则会导致数据不一致。
Outbox模式通过将消息与业务数据存储在同一个数据库事务中,然后由独立的进程异步转发消息到消息代理,从而实现了"至少一次投递"(At-Least-Once Delivery)保证,同时避免了分布式事务的复杂性和性能开销。
在微服务架构中,一个常见的场景是:当业务数据发生变化时,需要同时更新数据库并发送消息通知其他服务。例如:
直接方案的问题:
// 伪代码:直接发送消息
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // 操作1:写入数据库
messageBroker.send("order.created", order); // 操作2:发送消息
}
这种方案存在三种失败场景:
传统解决方案是使用分布式事务(如2PC/XA协议),但存在严重问题:
| 问题 | 说明 |
|---|---|
| 性能开销 | 2PC需要多轮网络通信,延迟显著增加 |
| 锁定资源 | 准备阶段会锁定数据库资源,降低并发性能 |
| 单点故障 | 协调器故障可能导致事务悬挂 |
| 消息代理支持 | 并非所有消息队列都支持XA协议(如Kafka早期版本不支持) |
| 复杂度 | 实现和维护成本高,故障排查困难 |
如果只使用本地事务,将消息发送排除在事务之外:
@Transactional
public void createOrder(Order order) {
orderRepository.save(order); // 在事务内
// 事务提交后发送消息
}
// 如果服务在事务提交后、发送消息前崩溃,消息丢失
这种方案在网络故障或服务崩溃时,仍然可能丢失消息。
Outbox模式的核心思想是:将消息存储与业务数据存储合二为一,利用数据库的ACID特性保证原子性,然后通过独立的转发器异步发送消息。
Outbox模式包含三个核心组件:
┌─────────────────────────────────────────────────────────────┐
│ 应用服务 │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ 业务逻辑处理 │───▶│ 数据库事务(原子操作) │ │
│ │ │ │ │ │
│ │ 1. 处理业务请求 │ │ ┌──────────────┐ ┌─────────┐ │ │
│ │ 2. 生成领域事件 │ │ │ 业务数据表 │ │ Outbox │ │ │
│ │ 3. 写入Outbox │ │ │ (Orders...) │ │ 表 │ │ │
│ └─────────────────┘ │ └──────────────┘ └─────────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 消息转发器(Relay/Processor) │
│ ┌──────────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ 轮询Outbox表 │───▶│ 读取待发送消息 │──▶│ 发送到MQ │ │
│ │ (定时或CDC) │ │ │ │(Kafka/Rmq)│ │
│ └──────────────────┘ └──────────────┘ └───────────┘ │
│ │ │
│ └───────────────────────────┘
│ 发送成功后标记/删除消息
└─────────────────────────────────────────────────────────────┘
第一步:业务处理与消息存储(原子操作)
@Transactional
public void createOrder(Order order) {
// 1. 保存业务数据
orderRepository.save(order);
// 2. 构建领域事件
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(),
order.getUserId(),
order.getItems(),
order.getTotalAmount()
);
// 3. 将事件序列化后存入Outbox表(同一事务)
OutboxMessage message = new OutboxMessage(
UUID.randomUUID(), // 消息ID
"order.created", // 消息类型/主题
objectMapper.writeValueAsString(event), // 序列化的事件内容
Instant.now(), // 创建时间
null // 发送时间(初始为空)
);
outboxRepository.save(message);
// 4. 事务提交:业务数据和Outbox记录同时成功或同时失败
}
Outbox表结构示例:
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL, -- 聚合类型:order, user, payment
aggregate_id VARCHAR(255) NOT NULL, -- 聚合ID
event_type VARCHAR(255) NOT NULL, -- 事件类型:created, updated, deleted
payload JSONB NOT NULL, -- 事件内容(JSON格式)
headers JSONB, -- 消息头信息
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP, -- 处理时间(null表示未处理)
retry_count INT DEFAULT 0, -- 重试次数
error_message TEXT -- 错误信息(失败时记录)
);
-- 索引优化
CREATE INDEX idx_outbox_unprocessed ON outbox(processed_at) WHERE processed_at IS NULL;
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
第二步:消息转发(异步处理)
@Component
public class OutboxRelay {
@Scheduled(fixedRate = 5000) // 每5秒轮询一次
public void processOutbox() {
// 1. 读取待处理消息(分页,避免一次性加载过多)
List<OutboxMessage> messages = outboxRepository
.findUnprocessed(PageRequest.of(0, 100));
for (OutboxMessage message : messages) {
try {
// 2. 发送到消息代理
messageBroker.send(
message.getEventType(),
message.getPayload(),
buildHeaders(message)
);
// 3. 标记为已处理(或删除)
message.setProcessedAt(Instant.now());
outboxRepository.save(message);
} catch (Exception e) {
// 4. 处理失败,记录错误和重试次数
message.setRetryCount(message.getRetryCount() + 1);
message.setErrorMessage(e.getMessage());
outboxRepository.save(message);
log.error("Failed to send outbox message: {}", message.getId(), e);
}
}
}
}
最基础的实现方式,通过定时任务轮询Outbox表。
优点:
缺点:
优化策略:
// 使用乐观锁防止重复处理
@Query("UPDATE outbox SET processed_at = NOW() " +
"WHERE id = :id AND processed_at IS NULL")
int markAsProcessed(@Param("id") UUID id);
// 处理时先更新,再发送
public void processWithOptimisticLock(OutboxMessage message) {
int updated = outboxRepository.markAsProcessed(message.getId());
if (updated == 1) {
// 只有成功标记的消息才发送
messageBroker.send(message.getEventType(), message.getPayload());
}
}
利用数据库的事务日志(Write-Ahead Log、Binlog等)捕获变更事件。
工作原理:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 应用服务 │──▶ │ 数据库 │──▶ │ CDC连接器 │
│ │ │ (Postgres/ │ │ (Debezium/ │
│ 写入Outbox │ │ MySQL...) │ │ Maxwell...) │
└──────────────┘ └──────────────┘ └──────┬───────┘
│
▼
┌──────────────┐
│ Kafka │
│ Connect │
└──────────────┘
Debezium配置示例(PostgreSQL):
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "dbz",
"database.dbname": "mydb",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "event_type"
}
}
优点:
缺点:
使用数据库触发器在Outbox表插入时直接发送消息。
PostgreSQL触发器示例:
-- 创建消息发送函数(使用dblink或外部调用)
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
-- 发送通知到PostgreSQL的LISTEN/NOTIFY机制
PERFORM pg_notify('outbox_channel',
json_build_object(
'id', NEW.id,
'event_type', NEW.event_type,
'payload', NEW.payload
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 绑定触发器
CREATE TRIGGER outbox_insert_trigger
AFTER INSERT ON outbox
FOR EACH ROW
EXECUTE FUNCTION notify_outbox_insert();
应用端监听:
@Component
public class PostgresOutboxListener {
@PostConstruct
public void startListening() {
// 使用PostgreSQL JDBC的LISTEN/NOTIFY
PGConnection pgConn = dataSource.getConnection().unwrap(PGConnection.class);
pgConn.addNotificationListener("outbox_channel", notification -> {
String payload = notification.getParameter();
processNotification(payload);
});
// 执行LISTEN命令
try (Statement stmt = pgConn.createStatement()) {
stmt.execute("LISTEN outbox_channel");
}
}
}
优点:
缺点:
在需要保持事件顺序的场景(如状态机、库存扣减),Outbox模式需要额外处理:
问题场景:
订单状态变化:CREATED → PAID → SHIPPED
对应事件:order.created → order.paid → order.shipped
如果order.paid先被发送,order.created后被发送,
消费者可能先收到paid事件,导致处理失败。
解决方案:
public void sendOrdered(OutboxMessage message) {
// 使用aggregate_id作为分区键
String partitionKey = message.getAggregateId();
kafkaTemplate.send(
ProducerRecord.builder(message.getEventType(), partitionKey, message.getPayload())
.header("aggregate_id", message.getAggregateId())
.build()
);
}
ALTER TABLE outbox ADD COLUMN sequence_number BIGSERIAL;
// 发送订单的完整当前状态,而非状态变化
public void publishOrderState(Order order) {
OutboxMessage message = new OutboxMessage(
"order.state_changed",
objectMapper.writeValueAsString(order), // 完整订单对象
order.getId()
);
outboxRepository.save(message);
}
即使使用Outbox模式,消息仍可能重复投递(至少一次语义)。消费者必须实现幂等性:
幂等性检查策略:
@Service
public class OrderEventHandler {
@KafkaListener(topics = "order.created")
public void handleOrderCreated(OrderCreatedEvent event) {
// 策略1:数据库唯一约束
try {
processedEventRepository.save(new ProcessedEvent(
event.getEventId(), // 消息唯一ID
Instant.now()
));
} catch (DuplicateKeyException e) {
// 已处理过,直接返回
log.info("Event already processed: {}", event.getEventId());
return;
}
// 执行业务逻辑
inventoryService.reserveStock(event.getOrderId(), event.getItems());
}
// 策略2:业务级别的幂等(如订单状态检查)
@KafkaListener(topics = "order.paid")
public void handleOrderPaid(OrderPaidEvent event) {
Order order = orderRepository.findById(event.getOrderId());
// 如果订单已经是PAID或之后的状态,忽略
if (order.getStatus().ordinal() >= OrderStatus.PAID.ordinal()) {
log.info("Order {} already in status {} or later",
event.getOrderId(), order.getStatus());
return;
}
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
}
}
对于多次发送失败的消息,需要转移到死信队列(DLQ)进行人工干预:
@Scheduled(fixedRate = 60000) // 每分钟检查
public void handleDeadLetters() {
// 查找重试超过阈值的消息
List<OutboxMessage> deadLetters = outboxRepository
.findByRetryCountGreaterThanAndProcessedAtIsNull(5);
for (OutboxMessage message : deadLetters) {
// 转移到死信队列表
deadLetterRepository.save(new DeadLetterMessage(
message,
"Max retry exceeded",
Instant.now()
));
// 从Outbox表删除(或标记为已处理)
message.setProcessedAt(Instant.now()); // 标记处理,但实际失败
outboxRepository.save(message);
// 发送告警通知
alertService.sendAlert("Dead letter message: " + message.getId());
}
}
高吞吐量场景下,批量处理可以显著提升性能:
@Scheduled(fixedRate = 1000)
public void batchProcessOutbox() {
// 批量读取
List<OutboxMessage> batch = outboxRepository
.findTop100ByProcessedAtIsNullOrderByCreatedAtAsc();
if (batch.isEmpty()) return;
// 批量发送到Kafka
List<ProducerRecord<String, String>> records = batch.stream()
.map(msg -> new ProducerRecord<>(
msg.getEventType(),
msg.getAggregateId(), // 分区键
msg.getPayload()
))
.collect(Collectors.toList());
// 异步批量发送
kafkaTemplate.sendAll(records)
.whenComplete((result, ex) -> {
if (ex == null) {
// 批量标记为已处理
List<UUID> ids = batch.stream()
.map(OutboxMessage::getId)
.collect(Collectors.toList());
outboxRepository.markAllProcessed(ids, Instant.now());
} else {
// 失败时逐个重试
batch.forEach(this::processSingle);
}
});
}
| 维度 | Outbox模式 | Saga模式 |
|---|---|---|
| 关注点 | 本地事务内的消息可靠性 | 跨服务的长事务协调 |
| 事务范围 | 单个服务内 | 多个服务间 |
| 一致性 | 最终一致性(消息投递) | 最终一致性(业务补偿) |
| 复杂度 | 低(基础设施层面) | 高(业务逻辑层面) |
| 适用场景 | 需要可靠发送事件 | 需要跨服务事务协调 |
关系: Outbox模式常作为Saga模式的底层实现,保证Saga事件可靠投递。
| 维度 | Outbox模式 | 事件溯源 |
|---|---|---|
| 存储方式 | 业务数据 + Outbox表 | 仅存储事件流 |
| 状态重建 | 直接查询业务表 | 重放事件流 |
| 事件作用 | 通知其他服务 | 系统状态的唯一定义 |
| 复杂度 | 较低 | 较高 |
| 查询性能 | 直接查询,性能好 | 需要投影(Projection) |
关系: 事件溯源系统中,事件日志天然就是Outbox,无需额外表。CDC可以直接从事务日志读取事件。
Inbox模式是Outbox的镜像,用于消费端的幂等性保证:
发送端(Outbox) 消费端(Inbox)
┌──────────────┐ ┌──────────────┐
│ 业务表 │ │ Inbox表 │
│ Outbox表 │──消息──▶ │ 业务处理 │
└──────────────┘ └──────────────┘
Inbox模式工作流程:
@KafkaListener(topics = "order.created")
public void consumeWithInbox(ConsumerRecord<String, String> record) {
// 1. 先存入Inbox表(幂等性保证)
InboxMessage inbox = new InboxMessage(
record.topic(),
record.partition(),
record.offset(),
record.value(),
InboxStatus.RECEIVED
);
try {
inboxRepository.save(inbox);
} catch (DuplicateKeyException e) {
// 已处理过,直接确认
return;
}
// 2. 处理业务逻辑
processBusinessLogic(inbox);
// 3. 标记为已处理
inbox.setStatus(InboxStatus.PROCESSED);
inboxRepository.save(inbox);
}
Outbox + Inbox组合使用,可以实现端到端的"恰好一次"(Exactly-Once)语义。
# Prometheus监控指标
outbox_messages_pending: # 待处理消息数
type: gauge
labels: [event_type]
outbox_processing_duration: # 处理耗时
type: histogram
labels: [event_type]
outbox_processing_errors: # 处理错误数
type: counter
labels: [event_type, error_type]
outbox_message_age: # 消息在队列中的年龄
type: histogram
labels: [event_type]
告警规则:
- alert: OutboxMessagesPending
expr: outbox_messages_pending > 1000
for: 5m
annotations:
summary: "Outbox队列堆积超过1000条"
- alert: OutboxMessageAgeHigh
expr: histogram_quantile(0.99, outbox_message_age) > 300
for: 5m
annotations:
summary: "99%消息在Outbox中停留超过5分钟"
- alert: OutboxProcessingErrors
expr: rate(outbox_processing_errors[5m]) > 10
annotations:
summary: "Outbox处理错误率过高"
| 优化策略 | 具体措施 | 效果 |
|---|---|---|
| 分区处理 | 按event_type分多个Relay实例 | 并行处理,提升吞吐量 |
| 批量发送 | 每批100-500条消息 | 减少网络往返 |
| 异步确认 | 使用Kafka的异步Producer | 提升发送吞吐量 |
| 索引优化 | 为processed_at和created_at建索引 | 加速查询 |
| 定期清理 | 删除已处理超过7天的消息 | 控制表大小 |
| 连接池 | 独立的数据库连接池给Relay | 避免影响主业务 |
-- 方案1:软删除(保留历史)
UPDATE outbox
SET processed_at = NOW(), deleted_at = NOW()
WHERE processed_at IS NOT NULL
AND processed_at < NOW() - INTERVAL '7 days';
-- 方案2:硬删除(定期清理)
DELETE FROM outbox
WHERE processed_at IS NOT NULL
AND processed_at < NOW() - INTERVAL '7 days';
-- 方案3:分区表(推荐)
-- 按created_at按月分区,过期分区直接DROP
┌──────────────┐ ┌──────────────┐
│ 数据中心A │ │ 数据中心B │
│ │ │ │
│ 业务DB ──▶ │ │ 业务DB ──▶ │
│ Outbox表 │ │ Outbox表 │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ Local Relay│ │ Local Relay│
│ │ │ │ │ │
└──────┼──────┘ └──────┼──────┘
│ │
└───────────┬───────────┘
▼
┌──────────────┐
│ Global MQ │
│ (Kafka) │
└──────────────┘
每个数据中心的Relay只处理本地的Outbox,发送到全局消息队列。
| 框架 | 特点 | 适用场景 |
|---|---|---|
| Debezium | CDC连接器,支持多种数据库 | 需要实时、低延迟的场景 |
| Spring Outbox | Spring生态集成,注解驱动 | Spring Boot项目 |
| Axon Framework | CQRS/ES完整框架,内置Outbox | 事件溯源架构 |
| Transactional Outbox | 轻量级库,简单实现 | 快速集成,简单需求 |
Spring Outbox示例:
@Service
public class OrderService {
@Autowired
private OutboxRepository outbox;
@Transactional
@PublishOutbox(eventType = "order.created")
public Order createOrder(CreateOrderRequest request) {
Order order = new Order(request);
orderRepository.save(order);
// 自动将返回值序列化存入Outbox
return order;
}
}
| 框架 | 特点 |
|---|---|
| MassTransit | 完整的分布式消息框架,内置Outbox |
| CAP | 轻量级,支持多种存储和MQ |
| NServiceBus | 企业级,功能丰富 |
MassTransit配置:
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<OrderDbContext>(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(5);
o.UseSqlServer();
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
// 使用Watermill的Outbox插件
import "github.com/ThreeDotsLabs/watermill/components/outbox"
func main() {
outboxSubscriber, err := outbox.NewPoller(
db,
outbox.PollerConfig{
Fetcher: outbox.DBFetcher{
// 查询配置
},
},
watermill.NewStdLogger(false, false),
)
// 转发到Kafka
publisher, _ := kafka.NewPublisher(...)
router.AddPlugin(outbox.NewForwarderPlugin(outboxSubscriber, publisher))
}
问题: 将大对象(如包含图片的订单)直接序列化到Outbox。
影响:
解决方案:
// 只发送引用,而非完整数据
public class OrderCreatedEvent {
private String orderId; // 只包含ID
private String orderSummaryUrl; // 详情查询URL
private BigDecimal totalAmount; // 关键摘要信息
// 不包含:商品明细、图片、大文本
}
// 消费者需要完整数据时,通过API查询
问题: Outbox写入和业务逻辑不在同一个事务中。
// ❌ 错误:事务边界不清
@Transactional
public void process() {
orderRepository.save(order); // 在事务内
}
// 事务提交后
public void afterCommit() {
outboxRepository.save(message); // ❌ 不在同一事务!
}
解决方案: 确保所有数据库操作在同一个@Transactional方法内。
问题: 假设消息总是按产生顺序被消费。
现实:
解决方案: 设计消费者为顺序无关的,或使用聚合ID分区。
问题: 消息在Outbox中积压过久,业务语义已失效。
// 处理时检查消息时效性
public void process(OutboxMessage message) {
if (message.getCreatedAt().isBefore(Instant.now().minus(24, ChronoUnit.HOURS))) {
log.warn("Stale message discarded: {}", message.getId());
message.setProcessedAt(Instant.now()); // 标记处理,跳过发送
return;
}
// ...
}
✅ 推荐使用:
❌ 不推荐使用:
阶段1:简单轮询Outbox
↓
阶段2:添加批量处理和分区优化
↓
阶段3:引入CDC(Debezium)降低延迟
↓
阶段4:结合Inbox模式实现Exactly-Once
↓
阶段5:演进为完整的事件溯源架构
Outbox模式是微服务架构中实现可靠消息传递的基石模式。它巧妙地利用关系型数据库的ACID特性,在不引入分布式事务复杂性的前提下,解决了数据一致性和消息可靠性的双重挑战。无论是初创项目还是大型企业系统,Outbox模式都值得作为默认的消息发送方案。