地理分片(Geo-Partitioning)是一种分布式数据库架构模式,通过将数据按照地理位置进行分区存储,以实现数据就近访问、降低延迟、满足数据主权合规要求的设计策略。在全球化应用场景中,地理分片成为构建高性能、高可用分布式系统的关键技术之一。
地理分片是指根据数据的地理属性(如用户所在地区、数据中心位置、国家/地区边界等)将数据分布到不同的物理节点或数据中心。每个分片负责特定地理区域内的数据存储和处理请求。
┌─────────────────────────────────────────────────────────────┐
│ 全球用户请求 │
└───────────────────────┬─────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 亚太区(APAC) │ │ 欧洲区(EMEA) │ │ 美洲区(AMER) │
│ 分片集群 │ │ 分片集群 │ │ 分片集群 │
├──────────────┤ ├──────────────┤ ├──────────────┤
│ 中国用户数据 │ │ 德国用户数据 │ │ 美国用户数据 │
│ 日本用户数据 │ │ 法国用户数据 │ │ 巴西用户数据 │
│ 新加坡数据 │ │ 英国用户数据 │ │ 加拿大数据 │
└──────────────┘ └──────────────┘ └──────────────┘
地理分片的核心是选择合适的分片键(Shard Key):
| 分片键类型 | 示例 | 适用场景 |
|---|---|---|
| 用户地理位置 | country_code, region_id |
用户数据归属明确的场景 |
| 数据中心标识 | dc_id, availability_zone |
多云、多机房部署 |
| 时区信息 | timezone |
时区敏感的业务处理 |
| 自定义地理编码 | geo_hash, s2_cell_id |
需要精确地理边界控制的场景 |
最简单的地理分片实现,每个地理区域对应独立的数据库实例:
# 配置示例
shards:
apac:
region: "asia-pacific"
endpoints:
- "db-apac-1.example.com:5432"
- "db-apac-2.example.com:5432"
jurisdictions: ["CN", "JP", "SG", "AU"]
emea:
region: "europe-middle-east-africa"
endpoints:
- "db-emea-1.example.com:5432"
- "db-emea-2.example.com:5432"
jurisdictions: ["DE", "FR", "UK", "ZA"]
americas:
region: "americas"
endpoints:
- "db-amer-1.example.com:5432"
- "db-amer-2.example.com:5432"
jurisdictions: ["US", "CA", "BR", "MX"]
结合地理分片和哈希分片,实现更细粒度的数据分布:
第一层:地理分片(Geo-Sharding)
│
├── 亚太区(APAC)
│ │
│ ├── 中国(CN)
│ │ ├── 用户分片 A(Hash: 0-4095)
│ │ ├── 用户分片 B(Hash: 4096-8191)
│ │ └── 用户分片 C(Hash: 8192-12287)
│ │
│ └── 日本(JP)
│ ├── 用户分片 D(Hash: 0-4095)
│ └── 用户分片 E(Hash: 4096-8191)
│
└── 欧洲区(EMEA)
│
├── 德国(DE)
└── 英国(UK)
地理分片需要一个智能路由层来将请求导向正确的分片:
@Component
public class GeoShardRouter {
private final ShardTopology topology;
private final GeoLocator geoLocator;
public ShardRoute route(RequestContext context) {
// 1. 从请求中提取地理信息
String countryCode = extractCountryCode(context);
// 2. 查找对应的分片
Shard shard = topology.findShardByJurisdiction(countryCode);
// 3. 处理用户已迁移的情况
if (isUserMigrated(context.getUserId())) {
shard = findActualShard(context.getUserId());
}
// 4. 返回路由决策
return new ShardRoute(shard, RouteReason.GEO_LOCATION);
}
private String extractCountryCode(RequestContext context) {
// 优先从用户资料获取
if (context.getUserProfile() != null) {
return context.getUserProfile().getCountryCode();
}
// 其次从 IP 地址推断
return geoLocator.locate(context.getClientIp()).getCountryCode();
}
}
-- 用户信息表(按用户ID地理分片)
CREATE TABLE users (
user_id BIGINT PRIMARY KEY,
username VARCHAR(100) NOT NULL,
email VARCHAR(255) NOT NULL,
country_code CHAR(2) NOT NULL, -- 分片键
region_code VARCHAR(10), -- 区域代码
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
-- 复合索引优化查询
INDEX idx_country_created (country_code, created_at),
INDEX idx_region (region_code)
) ENGINE=InnoDB
PARTITION BY LIST COLUMNS(country_code) (
PARTITION p_apac VALUES IN ('CN', 'JP', 'KR', 'SG', 'AU', 'IN', 'TH', 'VN'),
PARTITION p_emea VALUES IN ('DE', 'FR', 'UK', 'IT', 'ES', 'NL', 'SE', 'NO'),
PARTITION p_amer VALUES IN ('US', 'CA', 'MX', 'BR', 'AR', 'CL', 'CO', 'PE')
);
-- 订单表(与用户表协同分片)
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
country_code CHAR(2) NOT NULL, -- 冗余存储,避免跨分片查询
order_amount DECIMAL(15,2),
status VARCHAR(20),
created_at TIMESTAMP NOT NULL,
-- 外键约束在同分片内有效
FOREIGN KEY (user_id) REFERENCES users(user_id),
INDEX idx_user_orders (user_id, created_at)
) ENGINE=InnoDB;
-- 全局ID生成表(每个分片独立)
CREATE TABLE id_sequences (
sequence_name VARCHAR(50) PRIMARY KEY,
current_value BIGINT NOT NULL,
increment_by INT DEFAULT 1000,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
在应用程序中实现分片逻辑:
@Configuration
public class GeoShardingConfig {
@Bean
public DataSource routingDataSource() {
DynamicRoutingDataSource routingDS = new DynamicRoutingDataSource();
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("apac", apacDataSource());
targetDataSources.put("emea", emeaDataSource());
targetDataSources.put("americas", americasDataSource());
routingDS.setTargetDataSources(targetDataSources);
routingDS.setDefaultTargetDataSource(apacDataSource());
return routingDS;
}
}
@Aspect
@Component
public class GeoShardingAspect {
@Around("@annotation(geoSharded)")
public Object routeByGeo(ProceedingJoinPoint pjp, GeoSharded geoSharded) throws Throwable {
// 从 ThreadLocal 或方法参数获取地理上下文
String region = GeoContextHolder.getCurrentRegion();
try {
DataSourceContextHolder.setShard(region);
return pjp.proceed();
} finally {
DataSourceContextHolder.clear();
}
}
}
PostgreSQL + Citus:
-- 创建分布式表
SELECT create_distributed_table('users', 'country_code');
-- 定义分片亲和性(colocation)
SELECT create_distributed_table('orders', 'user_id', colocate_with => 'users');
-- 查看分片分布
SELECT * FROM pg_dist_shard_placement;
-- 按地理区域重新平衡分片
SELECT rebalance_table_shards('users');
MySQL + Vitess:
-- VSchema 配置
{
"sharded": true,
"vindexes": {
"region_vdx": {
"type": "region_experimental",
"params": {
"region_bytes": 1
}
}
},
"tables": {
"users": {
"column_vindexes": [
{
"column": "country_code",
"name": "region_vdx"
},
{
"column": "user_id",
"name": "hash"
}
]
}
}
}
CockroachDB 地理分区:
-- 创建地理分区表
CREATE TABLE users (
user_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username STRING NOT NULL,
country_code STRING NOT NULL,
created_at TIMESTAMP DEFAULT now()
) PARTITION BY LIST (country_code) (
PARTITION apac VALUES IN ('CN', 'JP', 'KR', 'SG', 'AU'),
PARTITION emea VALUES IN ('DE', 'FR', 'UK', 'IT', 'ES'),
PARTITION americas VALUES IN ('US', 'CA', 'MX', 'BR')
);
-- 配置分区放置策略
ALTER PARTITION apac CONFIGURE ZONE USING
constraints = '[+region=ap-southeast-1]',
gc.ttlseconds = 86400;
ALTER PARTITION emea CONFIGURE ZONE USING
constraints = '[+region=eu-west-1]',
gc.ttlseconds = 86400;
ALTER PARTITION americas CONFIGURE ZONE USING
constraints = '[+region=us-east-1]',
gc.ttlseconds = 86400;
@Service
public class DataSovereigntyService {
private final Map<String, DataResidencyPolicy> policies;
public void enforceResidencyRules(String countryCode, DataType dataType) {
DataResidencyPolicy policy = policies.get(countryCode);
if (policy != null && !policy.allowsExport(dataType)) {
throw new DataSovereigntyViolationException(
String.format("Data type %s cannot leave %s per local regulations",
dataType, countryCode)
);
}
}
public List<String> getAllowedReplicationTargets(String sourceCountry) {
DataResidencyPolicy policy = policies.get(sourceCountry);
return policy != null ? policy.getAllowedDestinations() : Collections.emptyList();
}
}
// 合规策略配置示例
@Data
public class DataResidencyPolicy {
private String countryCode;
private boolean dataMustStayLocal;
private Set<DataType> exportRestrictedTypes;
private List<String> allowedDestinations;
private boolean encryptionRequired;
private String encryptionStandard; // e.g., "AES-256-GCM"
}
@Transactional
public class UserMigrationService {
public MigrationResult migrateUser(Long userId, String targetRegion) {
// 1. 验证迁移可行性
validateMigrationFeasibility(userId, targetRegion);
// 2. 进入迁移状态(软删除源分片,标记目标分片)
markMigrationInProgress(userId, targetRegion);
// 3. 批量迁移数据
MigrationBatch batch = migrateUserData(userId, targetRegion);
// 4. 验证数据一致性
if (!verifyDataIntegrity(userId, batch)) {
rollbackMigration(userId);
throw new MigrationException("Data integrity check failed");
}
// 5. 更新路由映射
updateShardMapping(userId, targetRegion);
// 6. 清理源分片数据(延迟执行)
scheduleSourceCleanup(userId, Duration.ofDays(7));
return new MigrationResult(userId, targetRegion, batch.getRecordCount());
}
private MigrationBatch migrateUserData(Long userId, String targetRegion) {
MigrationBatch batch = new MigrationBatch();
// 迁移顺序很重要,先迁移依赖表
batch.add(migrateTable("user_profiles", userId, targetRegion));
batch.add(migrateTable("user_preferences", userId, targetRegion));
batch.add(migrateTable("orders", userId, targetRegion));
batch.add(migrateTable("transactions", userId, targetRegion));
return batch;
}
}
@Component
public class CrossShardQueryHandler {
private final ExecutorService executor;
public <T> List<T> executeCrossShard(Query<T> query, List<String> shards) {
List<CompletableFuture<List<T>>> futures = shards.stream()
.map(shard -> CompletableFuture.supplyAsync(
() -> executeOnShard(query, shard), executor))
.toList();
// 并行执行并聚合结果
return futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
}
// 跨分片聚合查询(COUNT, SUM, AVG)
public AggregationResult crossShardAggregation(AggregationQuery query) {
Map<String, ShardPartialResult> partials = executeOnAllShards(query);
// 合并部分结果
return switch (query.getAggregationType()) {
case COUNT -> mergeCounts(partials);
case SUM -> mergeSums(partials);
case AVG -> mergeAverages(partials);
case MIN -> mergeMins(partials);
case MAX -> mergeMaxs(partials);
};
}
}
# 地理分片监控配置
geo_sharding_metrics:
# 分片健康度
- name: shard_health
type: gauge
labels: [region, shard_id]
# 跨分片请求比例(应尽可能低)
- name: cross_shard_request_ratio
type: ratio
threshold_warning: 0.05
threshold_critical: 0.15
# 数据分布均衡度
- name: data_distribution_skew
type: gauge
labels: [table_name]
formula: max_shard_size / avg_shard_size
threshold_warning: 1.5
threshold_critical: 2.0
# 用户迁移队列深度
- name: migration_queue_depth
type: gauge
threshold_warning: 100
threshold_critical: 500
# 地理路由延迟
- name: geo_routing_latency_ms
type: histogram
buckets: [1, 5, 10, 25, 50, 100]
# 分片再平衡脚本
class ShardRebalancer:
def analyze_distribution(self):
"""分析当前数据分布"""
distribution = {}
for shard in self.topology.get_all_shards():
stats = shard.get_statistics()
distribution[shard.id] = {
'size_gb': stats.data_size_gb,
'qps': stats.queries_per_second,
'connections': stats.active_connections
}
return distribution
def calculate_rebalance_plan(self):
"""计算再平衡计划"""
distribution = self.analyze_distribution()
avg_size = mean(d['size_gb'] for d in distribution.values())
plan = []
for shard_id, stats in distribution.items():
deviation = (stats['size_gb'] - avg_size) / avg_size
if deviation > 0.3: # 超过平均30%,需要迁出
excess = stats['size_gb'] - avg_size * 1.1
plan.append(RebalanceAction(
source=shard_id,
action='EVACUATE',
estimated_gb=excess
))
elif deviation < -0.2: # 低于平均20%,可以接收
capacity = avg_size * 0.9 - stats['size_gb']
plan.append(RebalanceAction(
target=shard_id,
action='RECEIVE',
capacity_gb=capacity
))
return plan
def execute_rebalance(self, plan):
"""执行再平衡"""
for action in plan:
if action.action == 'EVACUATE':
self.migrate_data_ranges(action.source, action.target_ranges)
elif action.action == 'RECEIVE':
self.prepare_for_incoming(action.target)
# 地理分片灾备配置
disaster_recovery:
# 主从复制拓扑
replication:
apac_primary:
location: "ap-southeast-1"
replicas:
- location: "ap-northeast-1" # 跨区域副本
lag_threshold_ms: 1000
priority: 1
- location: "ap-southeast-2" # 同城副本
lag_threshold_ms: 100
priority: 2
emea_primary:
location: "eu-west-1"
replicas:
- location: "eu-central-1"
lag_threshold_ms: 500
priority: 1
# 自动故障转移
failover:
enabled: true
health_check_interval: 10s
failure_threshold: 3
auto_promote: false # 生产环境建议人工确认
# 数据恢复策略
recovery:
rpo_minutes: 5 # 恢复点目标
rto_minutes: 15 # 恢复时间目标
backup_retention_days: 30
public class ShardingKeyDesign {
/**
* 选择分片键的核心原则
*/
public void designPrinciples() {
// 1. 访问局部性:同一事务的数据应在同一分片
// ❌ 错误:用户和订单分在不同分片
// ✅ 正确:按用户ID分片,用户数据在一起
// 2. Cardinality 足够高:避免热点
// ❌ 错误:按大洲分片(只有7个值)
// ✅ 正确:按国家+城市组合分片
// 3. 稳定性:分片键值不应频繁变更
// ❌ 错误:按用户当前位置(动态变化)
// ✅ 正确:按用户注册地(相对稳定)
// 4. 业务含义:与业务边界对齐
// ✅ 例如:欧盟用户数据留在欧盟,符合GDPR
}
}
-- ✅ 推荐:单分片查询(利用分片键)
SELECT * FROM orders
WHERE user_id = 12345 AND country_code = 'CN';
-- ❌ 避免:跨分片查询(无法确定分片)
SELECT * FROM orders
WHERE created_at > '2024-01-01'; -- 需要扫描所有分片
-- ✅ 优化:添加分片键过滤
SELECT * FROM orders
WHERE country_code = 'CN' AND created_at > '2024-01-01';
@Service
public class OrderService {
/**
* 单分片事务 - 推荐使用
*/
@Transactional
public void createOrderWithinShard(OrderRequest request) {
// 所有操作都在同一分片内
Long userId = request.getUserId();
String countryCode = getUserCountry(userId);
// 路由自动选择正确分片
orderRepository.save(request.toOrder());
inventoryRepository.deduct(request.getItems());
paymentRepository.create(request.getPayment());
}
/**
* 跨分片操作 - 需要 Saga 模式
*/
public void crossShardTransfer(Long fromUser, Long toUser, BigDecimal amount) {
Saga saga = sagaOrchestrator.createSaga("cross-shard-transfer");
// Step 1: 从源用户扣款
saga.addStep(
() -> accountService.debit(fromUser, amount),
() -> accountService.credit(fromUser, amount) // 补偿
);
// Step 2: 给目标用户加款
saga.addStep(
() -> accountService.credit(toUser, amount),
() -> accountService.debit(toUser, amount) // 补偿
);
saga.execute();
}
}
| 特性 | CockroachDB | YugabyteDB | TiDB | Vitess (MySQL) | Citus (PostgreSQL) |
|---|---|---|---|---|---|
| 地理分区 | ✅ 原生支持 | ✅ 原生支持 | ✅ Placement Rules | ✅ 配置实现 | ✅ 表分区 |
| 自动再平衡 | ✅ | ✅ | ✅ | ⚠️ 需配置 | ⚠️ 需手动 |
| 跨分片事务 | ✅ | ✅ | ✅ | ⚠️ 有限支持 | ✅ |
| 数据主权 | ✅ 精细控制 | ✅ | ✅ | ⚠️ 应用层 | ⚠️ 应用层 |
| 云原生 | ✅ | ✅ | ✅ | ✅ | ✅ |
| 托管服务 | CockroachCloud | Yugabyte Cloud | TiDB Cloud | PlanetScale | Azure Cosmos DB |
问题:中国用户占80%,CN分片过载
解决方案:
1. 在CN分片内再进行哈希分片
2. 使用两层分片:地理+哈希
3. 预分片策略:将热门区域拆分为多个子分片
-- ❌ 问题:无法执行跨分片JOIN
SELECT u.*, o.*
FROM users u
JOIN orders o ON u.user_id = o.user_id
WHERE u.country_code = 'CN' AND o.country_code = 'US';
-- ✅ 解决方案1:应用层组装
List<User> users = userRepo.findByCountry("CN");
List<Long> userIds = users.stream().map(User::getId).toList();
List<Order> orders = orderRepo.findByUserIds(userIds); // 发送到对应分片
Map<Long, List<Order>> ordersByUser = orders.stream()
.collect(groupingBy(Order::getUserId));
-- ✅ 解决方案2:数据冗余(反范式化)
-- 在订单表中冗余用户姓名等信息
@Component
public class SkewMonitor {
@Scheduled(fixedRate = 60000)
public void checkDataSkew() {
List<ShardStats> stats = shardManager.getAllStats();
double avgSize = stats.stream()
.mapToLong(ShardStats::getDataSizeBytes)
.average()
.orElse(0);
for (ShardStats stat : stats) {
double deviation = Math.abs(stat.getDataSizeBytes() - avgSize) / avgSize;
if (deviation > 0.5) { // 偏差超过50%
alertService.sendAlert(new DataSkewAlert(
stat.getShardId(),
deviation,
"考虑进行数据再平衡"
));
}
}
}
}
地理分片是构建全球化分布式系统的核心架构模式,主要价值体现在:
实施地理分片需要综合考虑业务边界、访问模式、合规要求和技术栈特点。成功的关键在于:选择合适的分片键、设计高效的路由层、建立完善的运维监控体系。
随着云原生数据库技术的发展,地理分片的实现成本正在降低,但在应用架构设计、数据模型规划和运维管理方面仍需要深入的专业知识。
相关阅读