在软件工程中,理解值(Value)、引用(Reference)和实体(Entity)的区别是构建清晰、可维护系统的基石。这三个概念源自领域驱动设计(Domain-Driven Design, DDD),但它们的影响远不止于DDD——从函数式编程到分布式系统,从数据库设计到API契约,这些概念无处不在。本文将深入探讨这三个核心概念的本质区别、识别方法、实现策略以及常见陷阱。
定义:值对象是一个没有概念标识的对象,它完全由其属性值定义。如果两个值对象的所有属性都相同,它们就是同一个值对象。
核心特征:
现实类比:
代码示例:
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True) # frozen=True 确保不可变
class Money:
\"\"\"货币金额值对象\"\"\"
amount: Decimal
currency: str
def add(self, other: 'Money') -> 'Money':
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)
def __eq__(self, other):
if not isinstance(other, Money):
return False
return self.amount == other.amount and self.currency == other.currency
@dataclass(frozen=True)
class Address:
\"\"\"地址值对象\"\"\"
street: str
city: str
postal_code: str
country: str
def __post_init__(self):
# 可以在创建时进行验证
if not self.postal_code:
raise ValueError("Postal code is required")
# 使用示例
price1 = Money(Decimal("100.00"), "USD")
price2 = Money(Decimal("100.00"), "USD")
print(price1 == price2) # True - 值相等
print(price1 is price2) # False - 不是同一个对象实例
定义:实体是一个具有唯一标识的对象,即使它的所有属性都改变了,它仍然是同一个实体。身份标识是实体存在的根本原因。
核心特征:
现实类比:
代码示例:
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
import uuid
@dataclass
class Order:
\"\"\"订单实体\"\"\"
order_id: str # 唯一标识 - 这是实体的核心
customer_id: str
items: List[OrderItem] = field(default_factory=list)
status: str = "PENDING"
created_at: datetime = field(default_factory=datetime.now)
total_amount: Money = field(default=None)
def add_item(self, product_id: str, quantity: int, unit_price: Money):
\"\"\"添加订单项 - 实体状态改变,但身份不变\"\"\"
item = OrderItem(product_id, quantity, unit_price)
self.items.append(item)
self._recalculate_total()
def update_status(self, new_status: str):
\"\"\"更新状态 - 实体状态改变,但身份不变\"\"\"
self.status = new_status
def _recalculate_total(self):
\"\"\"重新计算总金额\"\"\"
total = sum(
item.unit_price.amount * item.quantity
for item in self.items
)
self.total_amount = Money(total, "USD")
def __eq__(self, other):
if not isinstance(other, Order):
return False
return self.order_id == other.order_id # 基于ID比较
def __hash__(self):
return hash(self.order_id)
# 使用示例
order1 = Order(order_id="ORD-2024-001", customer_id="CUST-001")
order2 = Order(order_id="ORD-2024-001", customer_id="CUST-999") # 不同的customer_id
print(order1 == order2) # True - 基于order_id相等
定义:引用是对另一个对象的间接访问机制,它不存储对象本身,而是存储如何找到该对象的信息。在DDD中,引用通常表现为实体之间的关联关系。
核心特征:
现实类比:
代码示例:
from typing import Optional
from dataclasses import dataclass
@dataclass
class Customer:
\"\"\"客户实体\"\"\"
customer_id: str
name: str
email: str
@dataclass
class Order:
\"\"\"订单实体 - 通过引用关联客户\"\"\"
order_id: str
customer_id: str # 引用客户的ID,而非嵌入客户对象
# 或者使用对象引用(在ORM中常见)
customer: Optional[Customer] = None # 可选的直接引用
def get_customer_info(self, customer_repo) -> Customer:
\"\"\"通过仓储获取引用的实体\"\"\"
if self.customer is None:
self.customer = customer_repo.find_by_id(self.customer_id)
return self.customer
# 引用 vs 嵌入的区别
class OrderWithEmbeddedCustomer:
\"\"\"不推荐:嵌入客户信息\"\"\"
order_id: str
customer_name: str # 冗余数据
customer_email: str # 冗余数据
# 问题:客户信息变更时,订单中的信息不会自动更新
class OrderWithCustomerReference:
\"\"\"推荐:引用客户\"\"\"
order_id: str
customer_id: str # 单一引用
# 客户信息变更不影响订单,始终获取最新信息
| 特性 | 值对象 | 实体 | 引用 |
|---|---|---|---|
| 身份标识 | 无 | 有(唯一ID) | 间接指向身份 |
| 相等性判断 | 属性值相等 | ID相等 | 指向同一对象 |
| 可变性 | 不可变 | 可变 | 可重新指向 |
| 生命周期 | 短暂,随用随建 | 有完整生命周期 | 依赖被引用对象 |
| 存储方式 | 嵌入(Embedded) | 独立存储 | 存储关联标识 |
| 典型示例 | Money, Address, Date | User, Order, Product | customer_id, order_id |
# 值对象的相等性 - 基于所有属性
class Money:
def __eq__(self, other):
if not isinstance(other, Money):
return False
return (self.amount == other.amount and
self.currency == other.currency)
# 实体的相等性 - 仅基于ID
class User:
def __eq__(self, other):
if not isinstance(other, User):
return False
return self.user_id == other.user_id
def __hash__(self):
return hash(self.user_id)
# 引用的相等性 - 基于指向的对象
class Order:
def __eq__(self, other):
if not isinstance(other, Order):
return False
return self.order_id == other.order_id
# 引用相等性的间接体现
def same_customer(self, other_order):
return self.customer_id == other_order.customer_id
# 值对象:随用随建,无需跟踪
class OrderService:
def calculate_discount(self, amount: Money, discount_rate: Decimal) -> Money:
# 新的Money值对象被创建,无需关心它的生命周期
discount_amount = Money(amount.amount * discount_rate, amount.currency)
return amount.subtract(discount_amount)
# 实体:需要跟踪状态变化
class OrderRepository:
def save(self, order: Order):
# 需要跟踪实体的创建、更新、删除
if order.is_new():
self._insert(order)
else:
self._update(order)
def find_by_id(self, order_id: str) -> Optional[Order]:
# 通过ID重新构建实体,保持身份连续性
pass
# 引用:需要管理关联关系
class Order:
def change_customer(self, new_customer_id: str):
# 引用可以改变,指向不同的实体
old_customer_id = self.customer_id
self.customer_id = new_customer_id
self.record_event(CustomerChangedEvent(old_customer_id, new_customer_id))
通过回答以下问题,可以判断一个概念应该是值对象还是实体:
问题1:如果属性完全改变,它是否还是同一个东西?
问题2:是否需要跟踪它的历史状态?
问题3:两个实例属性相同,是否意味着它们是同一个?
问题4:是否存在没有它就失去意义的标识?
| 概念 | 类型 | 理由 |
|---|---|---|
| 货币金额 | 值对象 | 100元就是100元,无需区分"哪一张" |
| 银行账户 | 实体 | 账号是核心标识,余额变化账户不变 |
| 地址 | 值对象 | 地址变了就是新地址,无需跟踪旧地址 |
| 客户 | 实体 | 客户ID是核心,信息可变 |
| 订单项 | 值对象 | 相同产品+数量+价格就是相同的项 |
| 订单 | 实体 | 订单号是核心标识 |
| 电子邮件地址 | 值对象 | 相同的邮箱地址就是同一个 |
| 电话号码 | 值对象 | 号码变了就是新号码 |
| 商品 | 实体 | 商品SKU是核心标识 |
| 商品规格 | 值对象 | 相同规格就是相同的 |
有些概念在特定上下文中可能是值对象,在其他上下文中可能是实体:
# 在订单上下文中,地址是值对象
class Order:
shipping_address: Address # 值对象 - 订单不关心地址的身份
# 在物流上下文中,地址可能是实体
class DeliveryRoute:
destination_address: Address # 可能是实体 - 需要跟踪地址的配送历史
delivery_history: List[DeliveryEvent]
# 在CRM上下文中,电子邮件地址可能是实体
class Contact:
email: str # 值对象 - 仅作为联系方式
class EmailCampaign:
target_email: EmailAddress # 可能是实体 - 需要跟踪发送历史、打开率等
send_history: List[EmailSendEvent]
关键原则:分类取决于上下文(Bounded Context),而非概念本身。
from dataclasses import dataclass
from typing import Tuple
import copy
# 方法1:使用 frozen dataclass(推荐)
@dataclass(frozen=True)
class Point:
x: float
y: float
def move(self, dx: float, dy: float) -> 'Point':
\"\"\"返回新的点,而非修改当前点\"\"\"
return Point(self.x + dx, self.y + dy)
# 方法2:使用属性装饰器控制
class Range:
def __init__(self, min_val: float, max_val: float):
self._min = min_val
self._max = max_val
@property
def min(self) -> float:
return self._min
@property
def max(self) -> float:
return self._max
def expand(self, factor: float) -> 'Range':
\"\"\"返回扩展后的新范围\"\"\"
center = (self._min + self._max) / 2
half_width = (self._max - self._min) / 2 * factor
return Range(center - half_width, center + half_width)
# 方法3:使用元组(简单场景)
Color = Tuple[int, int, int] # RGB值对象
@dataclass(frozen=True)
class Money:
amount: Decimal
currency: str
@dataclass(frozen=True)
class Price:
\"\"\"组合值对象\"\"\"
base_price: Money
tax: Money
discount: Money
@property
def total(self) -> Money:
return self.base_price.add(self.tax).subtract(self.discount)
@dataclass(frozen=True)
class Dimensions:
\"\"\"尺寸值对象\"\"\"
width: float
height: float
depth: float
unit: str = "cm"
@property
def volume(self) -> float:
return self.width * self.height * self.depth
from dataclass import dataclass
import re
@dataclass(frozen=True)
class Email:
\"\"\"电子邮件值对象 - 自动验证和规范化\"\"\"
address: str
def __post_init__(self):
# 规范化
normalized = self.address.strip().lower()
object.__setattr__(self, 'address', normalized)
# 验证
if not self._is_valid(normalized):
raise ValueError(f"Invalid email address: {self.address}")
@staticmethod
def _is_valid(email: str) -> bool:
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
@property
def domain(self) -> str:
return self.address.split('@')[1]
@property
def local_part(self) -> str:
return self.address.split('@')[0]
# 使用
try:
email = Email(" User@Example.COM ")
print(email.address) # "user@example.com" - 已规范化
print(email.domain) # "example.com"
except ValueError as e:
print(e)
import uuid
from datetime import datetime
from typing import Optional
class Entity:
\"\"\"实体基类\"\"\"
def __init__(self, entity_id: Optional[str] = None):
self._id = entity_id or self._generate_id()
self._created_at = datetime.now()
self._version = 0 # 乐观锁版本号
@property
def id(self) -> str:
return self._id
@staticmethod
def _generate_id() -> str:
\"\"\"生成唯一标识\"\"\"
return str(uuid.uuid4())
def __eq__(self, other):
if not isinstance(other, Entity):
return False
return self._id == other._id
def __hash__(self):
return hash(self._id)
# 特定领域的ID生成策略
class Order(Entity):
\"\"\"订单实体 - 使用业务友好的ID格式\"\"\"
@staticmethod
def _generate_id() -> str:
\"\"\"生成订单号:ORD-YYYYMMDD-XXXXX\"\"\"
date_prefix = datetime.now().strftime("ORD-%Y%m%d")
random_suffix = uuid.uuid4().hex[:5].upper()
return f"{date_prefix}-{random_suffix}"
class User(Entity):
\"\"\"用户实体 - 使用UUID\"\"\"
pass # 使用默认UUID生成
from typing import List, Callable
from dataclasses import dataclass, field
from datetime import datetime
import copy
@dataclass
class DomainEvent:
\"\"\"领域事件基类\"\"\"
occurred_at: datetime = field(default_factory=datetime.now)
@dataclass
class OrderStatusChangedEvent(DomainEvent):
order_id: str
old_status: str
new_status: str
class Order:
\"\"\"订单实体 - 带事件追踪\"\"\"
def __init__(self, order_id: str, customer_id: str):
self._order_id = order_id
self._customer_id = customer_id
self._status = "PENDING"
self._items: List[OrderItem] = []
self._events: List[DomainEvent] = []
self._version = 0
@property
def order_id(self) -> str:
return self._order_id
@property
def status(self) -> str:
return self._status
@property
def events(self) -> List[DomainEvent]:
return self._events.copy()
def clear_events(self):
\"\"\"清除已处理的事件\"\"\"
self._events.clear()
def update_status(self, new_status: str):
\"\"\"更新状态并记录事件\"\"\"
if new_status == self._status:
return
old_status = self._status
self._status = new_status
self._version += 1
# 记录领域事件
event = OrderStatusChangedEvent(
order_id=self._order_id,
old_status=old_status,
new_status=new_status
)
self._events.append(event)
def add_item(self, product_id: str, quantity: int, unit_price: Money):
\"\"\"添加订单项\"\"\"
item = OrderItem(product_id, quantity, unit_price)
self._items.append(item)
self._version += 1
class Order:
\"\"\"订单实体 - 封装业务规则\"\"\"
VALID_TRANSITIONS = {
"PENDING": ["PAID", "CANCELLED"],
"PAID": ["SHIPPED", "REFUNDED"],
"SHIPPED": ["DELIVERED", "RETURNED"],
"DELIVERED": ["RETURNED"],
"CANCELLED": [],
"REFUNDED": [],
"RETURNED": []
}
def update_status(self, new_status: str):
\"\"\"状态转换 - 封装业务规则\"\"\"
if new_status not in self.VALID_TRANSITIONS.get(self._status, []):
raise InvalidStateTransitionError(
f"Cannot transition from {self._status} to {new_status}"
)
# 执行状态转换
old_status = self._status
self._status = new_status
# 触发相应的领域事件
if new_status == "PAID":
self._events.append(OrderPaidEvent(self._order_id))
elif new_status == "SHIPPED":
self._events.append(OrderShippedEvent(self._order_id))
def can_cancel(self) -> bool:
\"\"\"判断是否可以取消 - 封装业务规则\"\"\"
return self._status in ["PENDING", "PAID"]
def cancel(self):
\"\"\"取消订单\"\"\"
if not self.can_cancel():
raise OrderCannotBeCancelledError(
f"Order {self._order_id} cannot be cancelled in status {self._status}"
)
self.update_status("CANCELLED")
from typing import Optional, List
from dataclasses import dataclass
# 一对一引用
@dataclass
class UserProfile:
user_id: str # 引用User实体
bio: str
avatar_url: str
# 一对多引用
@dataclass
class Order:
order_id: str
customer_id: str # 引用Customer实体
# 订单项作为值对象嵌入
items: List[OrderItem]
# 多对多引用 - 通过中间表
@dataclass
class ProductCategory:
product_id: str # 引用Product
category_id: str # 引用Category
# 自引用
@dataclass
class Employee:
employee_id: str
manager_id: Optional[str] # 引用另一个Employee
name: str
from typing import Optional, Callable
from dataclasses import dataclass
@dataclass
class Order:
order_id: str
customer_id: str
_customer: Optional[Customer] = None
_customer_loader: Optional[Callable[[str], Customer]] = None
@property
def customer(self) -> Customer:
\"\"\"延迟加载客户信息\"\"\"
if self._customer is None and self._customer_loader:
self._customer = self._customer_loader(self.customer_id)
return self._customer
def set_customer_loader(self, loader: Callable[[str], Customer]):
\"\"\"设置客户加载器\"\"\"
self._customer_loader = loader
# 使用示例
class OrderRepository:
def __init__(self, customer_repo: CustomerRepository):
self.customer_repo = customer_repo
def find_by_id(self, order_id: str) -> Optional[Order]:
# 从数据库加载订单(不包含客户详情)
order_data = self.db.query("SELECT * FROM orders WHERE id = ?", order_id)
if not order_data:
return None
order = Order(
order_id=order_data['id'],
customer_id=order_data['customer_id']
)
# 设置延迟加载器
order.set_customer_loader(self.customer_repo.find_by_id)
return order
# 使用
order = order_repo.find_by_id("ORD-001")
# 此时 customer 还未加载
print(order.customer_id) # 直接访问,无需加载
# 第一次访问 customer 属性时触发加载
customer = order.customer # 自动调用 customer_repo.find_by_id
print(customer.name)
class OrderService:
def __init__(self, order_repo: OrderRepository,
customer_repo: CustomerRepository):
self.order_repo = order_repo
self.customer_repo = customer_repo
def create_order(self, customer_id: str, items: List[OrderItem]) -> Order:
\"\"\"创建订单 - 验证引用完整性\"\"\"
# 验证客户存在
customer = self.customer_repo.find_by_id(customer_id)
if customer is None:
raise CustomerNotFoundError(f"Customer {customer_id} not found")
# 验证客户状态
if not customer.is_active:
raise CustomerInactiveError(f"Customer {customer_id} is inactive")
# 创建订单
order = Order(
order_id=self._generate_order_id(),
customer_id=customer_id
)
for item in items:
order.add_item(item.product_id, item.quantity, item.unit_price)
self.order_repo.save(order)
return order
def transfer_order(self, order_id: str, new_customer_id: str):
\"\"\"转移订单到新客户\"\"\"
order = self.order_repo.find_by_id(order_id)
if order is None:
raise OrderNotFoundError(f"Order {order_id} not found")
# 验证新客户存在
new_customer = self.customer_repo.find_by_id(new_customer_id)
if new_customer is None:
raise CustomerNotFoundError(f"Customer {new_customer_id} not found")
# 执行业务规则验证
if order.status not in ["PENDING", "PAID"]:
raise OrderCannotBeTransferredError(
f"Order {order_id} in status {order.status} cannot be transferred"
)
# 更新引用
old_customer_id = order.customer_id
order.customer_id = new_customer_id
# 记录事件
order.record_event(OrderTransferredEvent(
order_id=order_id,
old_customer_id=old_customer_id,
new_customer_id=new_customer_id
))
self.order_repo.save(order)
-- 订单表 - 地址作为值对象嵌入
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
-- 值对象:收货地址(嵌入)
shipping_street VARCHAR(255),
shipping_city VARCHAR(100),
shipping_postal_code VARCHAR(20),
shipping_country VARCHAR(50),
-- 值对象:账单地址(嵌入)
billing_street VARCHAR(255),
billing_city VARCHAR(100),
billing_postal_code VARCHAR(20),
billing_country VARCHAR(50),
-- 值对象:金额(拆分为amount和currency)
total_amount DECIMAL(19, 4),
currency VARCHAR(3),
status VARCHAR(20),
created_at TIMESTAMP
);
-- PostgreSQL / MySQL 5.7+ / SQL Server 2016+
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
-- 值对象:地址存储为JSON
shipping_address JSON,
billing_address JSON,
-- 值对象:金额存储为JSON
total_amount JSON, -- {"amount": 100.00, "currency": "USD"}
status VARCHAR(20),
created_at TIMESTAMP
);
-- 查询示例
SELECT * FROM orders
WHERE shipping_address->>'city' = 'Shanghai';
from dataclasses import dataclass, asdict
import json
@dataclass(frozen=True)
class Address:
street: str
city: str
postal_code: str
country: str
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> 'Address':
return cls(**data)
def to_json(self) -> str:
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> 'Address':
return cls.from_dict(json.loads(json_str))
# 在ORM中的使用(以SQLAlchemy为例)
from sqlalchemy import Column, String, TypeDecorator
class AddressType(TypeDecorator):
\"\"\"自定义SQLAlchemy类型用于值对象\"\"\"
impl = String
def process_bind_param(self, value, dialect):
if value is None:
return None
return value.to_json()
def process_result_value(self, value, dialect):
if value is None:
return None
return Address.from_json(value)
# 使用
class OrderORM(Base):
__tablename__ = 'orders'
order_id = Column(String(50), primary_key=True)
shipping_address = Column(AddressType(500)) # 值对象序列化存储
-- 客户实体表
CREATE TABLE customers (
customer_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE,
phone VARCHAR(20),
status VARCHAR(20) DEFAULT 'ACTIVE',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
version INT DEFAULT 0 -- 乐观锁
);
-- 订单实体表
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
total_amount DECIMAL(19, 4),
currency VARCHAR(3),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
version INT DEFAULT 0,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
);
-- 订单项值对象表(依赖订单实体)
CREATE TABLE order_items (
item_id INT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(50) NOT NULL,
product_id VARCHAR(50) NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(19, 4) NOT NULL,
currency VARCHAR(3) NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE
);
-- 事件存储表(Event Sourcing支持)
CREATE TABLE domain_events (
event_id VARCHAR(50) PRIMARY KEY,
aggregate_type VARCHAR(50) NOT NULL, -- 'Order', 'Customer'等
aggregate_id VARCHAR(50) NOT NULL, -- 实体ID
event_type VARCHAR(100) NOT NULL, -- 'OrderCreated', 'OrderPaid'等
event_data JSON NOT NULL, -- 事件详情
version INT NOT NULL, -- 实体版本号
occurred_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_aggregate (aggregate_type, aggregate_id, version)
);
-- 订单表引用客户表
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL,
-- ... 其他字段
CONSTRAINT fk_customer
FOREIGN KEY (customer_id)
REFERENCES customers(customer_id)
ON DELETE RESTRICT -- 防止删除有订单的客户
ON UPDATE CASCADE -- 客户ID更新时同步更新
);
-- 微服务架构中,跨服务的引用通常不使用数据库外键
CREATE TABLE orders (
order_id VARCHAR(50) PRIMARY KEY,
customer_id VARCHAR(50) NOT NULL, -- 引用另一个服务的客户
-- ... 其他字段
-- 无外键约束,引用完整性由应用层保证
);
// 值对象 - 使用record(Java 14+)
public record Money(BigDecimal amount, String currency) {
public Money {
// 验证
if (amount == null || currency == null || currency.isBlank()) {
throw new IllegalArgumentException("Invalid money values");
}
// 规范化
currency = currency.toUpperCase();
}
public Money add(Money other) {
if (!this.currency.equals(other.currency)) {
throw new IllegalArgumentException("Currency mismatch");
}
return new Money(this.amount.add(other.amount), this.currency);
}
}
// 实体
public class Order {
private final String orderId; // 不变的身份标识
private String status;
private List<OrderItem> items;
private final List<DomainEvent> events = new ArrayList<>();
public Order(String orderId, String customerId) {
this.orderId = Objects.requireNonNull(orderId);
this.status = "PENDING";
this.items = new ArrayList<>();
}
// 只提供getter,不提供setter for ID
public String getOrderId() { return orderId; }
public void updateStatus(String newStatus) {
String oldStatus = this.status;
this.status = newStatus;
events.add(new OrderStatusChangedEvent(orderId, oldStatus, newStatus));
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Order order = (Order) o;
return Objects.equals(orderId, order.orderId);
}
@Override
public int hashCode() {
return Objects.hash(orderId);
}
}
// 值对象 - 使用record(C# 9.0+)
public record Money
{
public decimal Amount { get; init; }
public string Currency { get; init; }
public Money(decimal amount, string currency)
{
if (amount < 0) throw new ArgumentException("Amount cannot be negative");
if (string.IsNullOrWhiteSpace(currency)) throw new ArgumentException("Currency is required");
Amount = amount;
Currency = currency.ToUpper();
}
public Money Add(Money other)
{
if (Currency != other.Currency)
throw new InvalidOperationException("Cannot add different currencies");
return new Money(Amount + other.Amount, Currency);
}
}
// 实体
public class Order : Entity, IAggregateRoot
{
public string OrderId { get; private set; }
public string CustomerId { get; private set; }
public string Status { get; private set; }
public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
private List<OrderItem> _items = new();
public Order(string orderId, string customerId)
{
OrderId = orderId;
CustomerId = customerId;
Status = "PENDING";
}
public void UpdateStatus(string newStatus)
{
var oldStatus = Status;
Status = newStatus;
AddDomainEvent(new OrderStatusChangedEvent(OrderId, oldStatus, newStatus));
}
public override bool Equals(object? obj)
{
if (obj is not Order other) return false;
return OrderId == other.OrderId;
}
public override int GetHashCode() => OrderId.GetHashCode();
}
// 值对象
type Money struct {
Amount decimal.Decimal
Currency string
}
func NewMoney(amount decimal.Decimal, currency string) (Money, error) {
if currency == "" {
return Money{}, errors.New("currency is required")
}
return Money{
Amount: amount,
Currency: strings.ToUpper(currency),
}, nil
}
func (m Money) Add(other Money) (Money, error) {
if m.Currency != other.Currency {
return Money{}, errors.New("currency mismatch")
}
return NewMoney(m.Amount.Add(other.Amount), m.Currency)
}
// 实体
type Order struct {
OrderID string
CustomerID string
Status string
Items []OrderItem
events []DomainEvent
}
func NewOrder(orderID, customerID string) *Order {
return &Order{
OrderID: orderID,
CustomerID: customerID,
Status: "PENDING",
Items: make([]OrderItem, 0),
events: make([]DomainEvent, 0),
}
}
func (o *Order) UpdateStatus(newStatus string) {
oldStatus := o.Status
o.Status = newStatus
o.events = append(o.events, OrderStatusChangedEvent{
OrderID: o.OrderID,
OldStatus: oldStatus,
NewStatus: newStatus,
})
}
// Go没有内置的equals,需要手动实现
func (o *Order) Equals(other *Order) bool {
if other == nil {
return false
}
return o.OrderID == other.OrderID
}
// 值对象
class Money {
constructor(
public readonly amount: Decimal,
public readonly currency: string
) {
if (!currency || currency.trim() === '') {
throw new Error('Currency is required');
}
}
add(other: Money): Money {
if (this.currency !== other.currency) {
throw new Error('Currency mismatch');
}
return new Money(this.amount.plus(other.amount), this.currency);
}
equals(other: Money): boolean {
return this.amount.equals(other.amount) &&
this.currency === other.currency;
}
}
// 实体
class Order {
private _status: string = 'PENDING';
private _items: OrderItem[] = [];
private _events: DomainEvent[] = [];
constructor(
public readonly orderId: string, // readonly = 身份标识不可变
public customerId: string
) {}
get status(): string {
return this._status;
}
get items(): readonly OrderItem[] {
return Object.freeze([...this._items]);
}
updateStatus(newStatus: string): void {
const oldStatus = this._status;
this._status = newStatus;
this._events.push(new OrderStatusChangedEvent(
this.orderId, oldStatus, newStatus
));
}
equals(other: Order): boolean {
return other instanceof Order && this.orderId === other.orderId;
}
}
# ❌ 错误:给值对象添加ID
@dataclass
class Address:
address_id: str # 值对象不需要ID!
street: str
city: str
# ✅ 正确:值对象没有ID
@dataclass(frozen=True)
class Address:
street: str
city: str
postal_code: str
country: str
# ❌ 错误:修改值对象
class Money:
def __init__(self, amount: Decimal, currency: str):
self.amount = amount
self.currency = currency
def add(self, other: 'Money'):
self.amount += other.amount # 修改了自身状态!
# ✅ 正确:返回新的值对象
@dataclass(frozen=True)
class Money:
def add(self, other: 'Money') -> 'Money':
return Money(self.amount + other.amount, self.currency)
# ❌ 错误:共享值对象引用
address = Address("Street 1", "City", "12345", "Country")
order1 = Order(customer_id="C1")
order1.shipping_address = address # 引用赋值
order2 = Order(customer_id="C2")
order2.shipping_address = address # 同一个引用!
# 如果值对象是可变的,修改会影响所有引用
# 即使值对象是不可变的,也可能导致意外的共享
# ✅ 正确:复制值对象
order1.shipping_address = address
order2.shipping_address = copy.deepcopy(address) # 或者重新创建
# ❌ 错误:将明显是值对象的概念建模为实体
class Color:
def __init__(self, color_id: str, r: int, g: int, b: int):
self.color_id = color_id # 颜色需要ID吗?不需要!
self.r = r
self.g = g
self.b = b
# ✅ 正确:颜色是值对象
@dataclass(frozen=True)
class Color:
r: int
g: int
b: int
# ❌ 错误:基于属性比较实体
class User:
def __eq__(self, other):
return (self.name == other.name and
self.email == other.email) # 危险!同名同姓的人会相等
# ✅ 正确:基于ID比较实体
class User:
def __eq__(self, other):
if not isinstance(other, User):
return False
return self.user_id == other.user_id
# ❌ 错误:直接暴露内部集合
class Order:
def __init__(self):
self.items = [] # 可以直接修改!
order = Order()
order.items.append(some_item) # 绕过了领域规则验证
# ✅ 正确:控制状态访问
class Order:
def __init__(self):
self._items = []
@property
def items(self) -> tuple:
return tuple(self._items) # 返回不可变视图
def add_item(self, item: OrderItem):
# 可以在这里执行业务规则验证
self._validate_item(item)
self._items.append(item)
self._recalculate_total()
# ❌ 错误:循环引用
class Employee:
def __init__(self):
self.manager: Employee = None
self.subordinates: List[Employee] = []
# 可能导致序列化问题、内存泄漏
# ✅ 正确:使用ID引用避免循环
class Employee:
def __init__(self, employee_id: str):
self.employee_id = employee_id
self.manager_id: Optional[str] = None # 使用ID而非对象引用
self.subordinate_ids: List[str] = []
# ❌ 错误:一次性加载所有关联
class OrderRepository:
def find_by_id(self, order_id: str) -> Order:
order_data = self.db.query("SELECT * FROM orders WHERE id = ?", order_id)
customer = self.customer_repo.find_by_id(order_data['customer_id']) # 立即加载
items = self.item_repo.find_by_order_id(order_id) # 立即加载
# ... 加载更多关联
return Order(order_data, customer, items, ...)
# ✅ 正确:延迟加载
class Order:
def __init__(self, order_id: str, customer_id: str):
self.order_id = order_id
self.customer_id = customer_id
self._customer: Optional[Customer] = None
@property
def customer(self) -> Customer:
if self._customer is None:
self._customer = customer_repository.find_by_id(self.customer_id)
return self._customer
# ❌ 错误:删除被引用的实体而不处理引用
class OrderService:
def delete_customer(self, customer_id: str):
self.customer_repo.delete(customer_id) # 客户被删除
# 但订单表中仍有 customer_id 引用!
# ✅ 正确:处理引用完整性
class OrderService:
def delete_customer(self, customer_id: str):
# 检查是否有订单引用此客户
orders = self.order_repo.find_by_customer_id(customer_id)
if orders:
raise CustomerHasOrdersError(
f"Cannot delete customer {customer_id}, "
f"they have {len(orders)} orders"
)
# 或者:级联处理
# for order in orders:
# order.customer_id = "DEFAULT_CUSTOMER"
# self.order_repo.save(order)
self.customer_repo.delete(customer_id)
class MoneyFactory:
\"\"\"值对象工厂 - 确保规范化和验证\"\"\"
@staticmethod
def create(amount: Decimal, currency: str) -> Money:
# 规范化
currency = currency.strip().upper()
# 验证
if currency not in SUPPORTED_CURRENCIES:
raise UnsupportedCurrencyError(f"Currency {currency} is not supported")
# 创建
return Money(amount, currency)
@staticmethod
def zero(currency: str) -> Money:
return MoneyFactory.create(Decimal("0"), currency)
@staticmethod
def from_string(amount_str: str, currency: str) -> Money:
try:
amount = Decimal(amount_str)
return MoneyFactory.create(amount, currency)
except InvalidOperation:
raise InvalidAmountError(f"Invalid amount: {amount_str}")
class OrderRepository:
\"\"\"实体仓储 - 管理实体的生命周期\"\"\"
def find_by_id(self, order_id: str) -> Optional[Order]:
\"\"\"通过ID重建实体 - 保持身份连续性\"\"\"
data = self.db.query("SELECT * FROM orders WHERE order_id = ?", order_id)
if not data:
return None
# 重建实体(不调用构造函数,避免生成新ID)
order = Order.__new__(Order)
order._order_id = data['order_id']
order._customer_id = data['customer_id']
order._status = data['status']
# ... 恢复其他属性
return order
def save(self, order: Order):
\"\"\"保存实体 - 跟踪变更\"\"\"
if order.is_new():
self._insert(order)
else:
self._update(order)
# 发布领域事件
for event in order.events:
self.event_bus.publish(event)
order.clear_events()
from abc import ABC, abstractmethod
class Specification(ABC):
\"\"\"规格模式 - 使用值对象封装业务规则\"\"\"
@abstractmethod
def is_satisfied_by(self, candidate) -> bool:
pass
def and_(self, other: 'Specification') -> 'Specification':
return AndSpecification(self, other)
def or_(self, other: 'Specification') -> 'Specification':
return OrSpecification(self, other)
class MoneyRangeSpecification(Specification):
\"\"\"金额范围规格 - 值对象\"\"\"
def __init__(self, min_amount: Money, max_amount: Money):
self.min_amount = min_amount
self.max_amount = max_amount
def is_satisfied_by(self, amount: Money) -> bool:
return (amount.currency == self.min_amount.currency and
self.min_amount.amount <= amount.amount <= self.max_amount.amount)
# 使用
price_range = MoneyRangeSpecification(
Money(Decimal("10"), "USD"),
Money(Decimal("100"), "USD")
)
if price_range.is_satisfied_by(product.price):
print("Price is within acceptable range")
import json
from dataclasses import asdict
class ValueObjectJSONEncoder(json.JSONEncoder):
\"\"\"值对象JSON序列化器\"\"\"
def default(self, obj):
if isinstance(obj, ValueObject):
return {
"__type__": obj.__class__.__name__,
"__data__": asdict(obj)
}
if isinstance(obj, Decimal):
return str(obj)
return super().default(obj)
# 使用
order_data = {
"order_id": "ORD-001",
"total_amount": Money(Decimal("100"), "USD"),
"shipping_address": Address("Street 1", "City", "12345", "Country")
}
json_str = json.dumps(order_data, cls=ValueObjectJSONEncoder)
# 结果:
# {
# "order_id": "ORD-001",
# "total_amount": {"__type__": "Money", "__data__": {"amount": "100", "currency": "USD"}},
# "shipping_address": {"__type__": "Address", "__data__": {...}}
# }
# 在微服务中,实体的一致性边界很重要
class OrderService:
\"\"\"订单服务 - 订单聚合的根\"\"\"
def create_order(self, customer_id: str, items: List[OrderItem]) -> Order:
# 订单服务负责订单聚合的一致性
order = Order(self._generate_id(), customer_id)
for item in items:
# 验证产品存在(通过产品服务)
product = self.product_service.get_product(item.product_id)
if not product:
raise ProductNotFoundError(item.product_id)
# 验证库存(通过库存服务)
inventory = self.inventory_service.check_stock(item.product_id, item.quantity)
if not inventory.available:
raise InsufficientStockError(item.product_id)
order.add_item(item.product_id, item.quantity, product.price)
self.order_repo.save(order)
# 发布订单创建事件
self.event_bus.publish(OrderCreatedEvent(order.order_id))
return order
# 跨服务引用 - 使用事件驱动最终一致性
class CustomerService:
def update_customer(self, customer_id: str, new_email: str):
customer = self.customer_repo.find_by_id(customer_id)
old_email = customer.email
customer.update_email(new_email)
self.customer_repo.save(customer)
# 发布客户信息变更事件
self.event_bus.publish(CustomerUpdatedEvent(
customer_id=customer_id,
changed_fields={"email": {"old": old_email, "new": new_email}}
))
class OrderService:
def handle_customer_updated(self, event: CustomerUpdatedEvent):
\"\"\"监听客户变更事件,更新订单中的引用信息(如果需要)\"\"\"
if "email" in event.changed_fields:
# 订单服务可能缓存了客户邮箱,需要更新
orders = self.order_repo.find_by_customer_id(event.customer_id)
for order in orders:
# 更新订单中的客户邮箱缓存(如果需要)
order.update_customer_email(event.changed_fields["email"]["new"])
self.order_repo.save(order)
值对象:
实体:
引用:
开始
│
▼
这个概念需要跟踪历史状态吗?
│
├── 是 → 实体
│
└── 否
│
▼
两个实例属性相同,是否意味着它们是同一个?
│
├── 是 → 值对象
│
└── 否
│
▼
这个概念有自然的唯一标识吗?
│
├── 是 → 实体
│
└── 否 → 值对象
值对象检查清单:
实体检查清单:
引用检查清单:
从值对象开始:如果无法确定是实体还是值对象,先设计为值对象。将值对象改为实体比反过来更容易。
保持值对象小而简单:值对象应该专注于一个概念,避免变成"万能对象"。
实体的身份标识越早确定越好:在系统设计的早期就确定实体的标识策略。
引用要显式:避免隐式的全局引用或单例模式,明确表达实体间的关系。
上下文决定分类:同一个概念在不同上下文中可能是值对象或实体,不要强行统一。
测试相等性:为值对象和实体编写明确的相等性测试,确保行为符合预期。
文档化决策:在代码注释或设计文档中记录为什么某个概念被设计为值对象或实体。
理解值、引用和实体的区别不仅仅是技术实现的问题,更是对业务领域的深入理解。正确的建模能够使代码更清晰地表达业务意图,减少bug,提高系统的可维护性。在实际项目中,建议团队就这些概念达成共识,并在代码审查中检查这些原则是否被正确应用。