8:事件和消息总线
到目前为止,我们已经在一个简单的问题上花费了大量时间和精力,而这个问题我们可以用 Django 轻松解决。您可能会问,增加的可测试性和表达性是否真的值得付出所有努力。
但在实践中,我们发现使我们的代码库一团糟的不是那些显而易见的功能:而是边缘的粘合代码。 它是报告、权限以及涉及无数对象的工作流程。
我们的示例将是一个典型的通知需求:当我们因为缺货而无法分配订单时,我们应该提醒采购团队。 他们会去购买更多库存来解决问题,一切都会好起来的。
对于第一个版本,我们的产品负责人说我们可以通过电子邮件发送警报。
让我们看看当我们需要插入一些构成我们系统大部分的平凡的东西时,我们的架构如何应对。
我们将从做最简单、最快捷的事情开始,并讨论为什么正是这种决定导致我们走向泥潭式的大杂烩。
然后,我们将展示如何使用领域事件模式将副作用与我们的用例分离,以及如何使用简单的消息总线模式来基于这些事件触发行为。我们将展示创建这些事件的几种选项以及如何将它们传递给消息总线,最后我们将展示如何修改工作单元模式以优雅地将两者连接在一起,如事件在系统中流动中预览的那样。

提示
|
本章的代码位于 chapter_08_events_and_message_bus 分支 GitHub 上 git clone https://github.com/cosmicpython/code.git cd code git checkout chapter_08_events_and_message_bus # or to code along, checkout the previous chapter: git checkout chapter_07_aggregate |
避免制造混乱
所以。 当我们缺货时发送电子邮件警报。 当我们有像那些与核心领域真的无关的新需求时,很容易开始将这些东西倾倒到我们的 Web 控制器中。
首先,让我们避免使我们的 Web 控制器变得混乱
作为一次性的临时措施,这可能还可以
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
line = model.OrderLine(
request.json["orderid"],
request.json["sku"],
request.json["qty"],
)
try:
uow = unit_of_work.SqlAlchemyUnitOfWork()
batchref = services.allocate(line, uow)
except (model.OutOfStock, services.InvalidSku) as e:
send_mail(
"out of stock",
"stock_admin@made.com",
f"{line.orderid} - {line.sku}"
)
return {"message": str(e)}, 400
return {"batchref": batchref}, 201
……但这很容易看出,我们如何通过像这样修补东西而迅速陷入混乱。 发送电子邮件不是我们的 HTTP 层的工作,我们希望能够单元测试这个新功能。
并且让我们也不要使我们的模型变得混乱
假设我们不想将此代码放入我们的 Web 控制器中,因为我们希望它们尽可能精简,我们可能会考虑将其直接放在源头,即模型中
def allocate(self, line: OrderLine) -> str:
try:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
#...
except StopIteration:
email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
raise OutOfStock(f"Out of stock for sku {line.sku}")
但这更糟! 我们不希望我们的模型对基础设施问题有任何依赖,例如 email.send_mail
。
这个发送电子邮件的东西是不受欢迎的粘合代码,它扰乱了我们系统良好的清晰流程。 我们想要的是让我们的领域模型专注于“您不能分配比实际可用库存更多的东西”的规则。
或者服务层!
“尝试分配一些库存,如果失败则发送电子邮件”的要求是工作流编排的一个示例:它是系统必须遵循的实现目标的步骤集合。
我们已经编写了一个服务层来为我们管理编排,但即使在这里,该功能也感觉格格不入
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
try:
batchref = product.allocate(line)
uow.commit()
return batchref
except model.OutOfStock:
email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
raise
捕获异常并重新引发它? 情况可能会更糟,但这绝对让我们不高兴。 为什么为这段代码找到合适的家如此困难?
单一职责原则
实际上,这是违反了单一职责原则 (SRP)。[1] 我们的用例是分配。 我们的端点、服务函数和领域方法都称为 allocate
,而不是 allocate_and_send_mail_if_out_of_stock
。
提示
|
经验法则:如果您在不使用“然后”或“和”之类的词语就无法描述您的函数的作用,那么您可能违反了 SRP。 |
SRP 的一种表述是,每个类都应该只有一个更改理由。 当我们从电子邮件切换到短信时,我们不应该更新我们的 allocate()
函数,因为这显然是独立的职责。
为了解决这个问题,我们将把编排分成不同的步骤,以便不同的关注点不会纠缠在一起。[2] 领域模型的工作是知道我们缺货,但发送警报的责任属于其他地方。 我们应该能够打开或关闭此功能,或者切换到 SMS 通知,而无需更改我们领域模型的规则。
我们还希望保持服务层免受实现细节的影响。 我们希望将依赖倒置原则应用于通知,以便我们的服务层依赖于抽象,就像我们通过使用工作单元来避免依赖数据库一样。
全体登上消息总线!
我们在这里将要介绍的模式是领域事件和消息总线。 我们可以用几种方式实现它们,因此我们将展示几个,然后再确定我们最喜欢的一个。
事件是简单的数据类
事件是一种值对象。 事件没有任何行为,因为它们是纯粹的数据结构。 我们始终以领域语言命名事件,并将它们视为我们领域模型的一部分。
我们可以将它们存储在 model.py 中,但我们不妨将它们保存在自己的文件中(现在可能是考虑重构出一个名为 domain 的目录的好时机,以便我们拥有 domain/model.py 和 domain/events.py)
from dataclasses import dataclass
class Event: #(1)
pass
@dataclass
class OutOfStock(Event): #(2)
sku: str
-
一旦我们有了许多事件,我们就会发现拥有一个可以存储通用属性的父类很有用。 它对于消息总线中的类型提示也很有用,您很快就会看到。
-
dataclasses
也非常适合领域事件。
模型引发事件
当我们的领域模型记录一个已发生的事实时,我们说它引发一个事件。
这是从外部看起来的样子; 如果我们要求 Product
分配但它无法分配,它应该引发一个事件
def test_records_out_of_stock_event_if_cannot_allocate():
batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
product = Product(sku="SMALL-FORK", batches=[batch])
product.allocate(OrderLine("order1", "SMALL-FORK", 10))
allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK") #(1)
assert allocation is None
-
我们的聚合将公开一个名为
.events
的新属性,它将包含一个关于已发生事件的事实列表,以Event
对象的形式。
这是模型内部的样子
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
self.sku = sku
self.batches = batches
self.version_number = version_number
self.events = [] # type: List[events.Event] #(1)
def allocate(self, line: OrderLine) -> str:
try:
#...
except StopIteration:
self.events.append(events.OutOfStock(line.sku)) #(2)
# raise OutOfStock(f"Out of stock for sku {line.sku}") #(3)
return None
-
这是我们新的
.events
属性的使用。 -
我们不是直接调用一些发送电子邮件的代码,而是在事件发生的地方记录这些事件,仅使用领域语言。
-
我们还将停止为缺货情况引发异常。 事件将完成异常正在做的工作。
注意
|
我们实际上正在解决我们到目前为止存在的代码异味,即我们使用异常进行控制流。 一般来说,如果您正在实现领域事件,请不要引发异常来描述相同的领域概念。 正如您稍后在工作单元模式中处理事件时看到的那样,必须一起推理事件和异常会让人感到困惑。 |
消息总线将事件映射到处理程序
消息总线基本上说,“当我看到此事件时,我应该调用以下处理程序函数。” 换句话说,它是一个简单的发布-订阅系统。 处理程序订阅接收事件,我们将其发布到总线。 这听起来比实际情况要难,我们通常使用字典来实现它
def handle(event: events.Event):
for handler in HANDLERS[type(event)]:
handler(event)
def send_out_of_stock_notification(event: events.OutOfStock):
email.send_mail(
"stock@made.com",
f"Out of stock for {event.sku}",
)
HANDLERS = {
events.OutOfStock: [send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
注意
|
请注意,已实现的消息总线不会给我们带来并发性,因为一次只会运行一个处理程序。 我们的目标不是支持并行线程,而是从概念上分离任务,并使每个 UoW 尽可能小。 这有助于我们理解代码库,因为如何运行每个用例的“配方”都写在一个地方。 请参阅以下侧边栏。 |
选项 1:服务层从模型中获取事件并将它们放到消息总线上
我们的领域模型引发事件,我们的消息总线将在事件发生时调用正确的处理程序。 现在我们只需要连接两者。 我们需要一些东西来捕获模型中的事件并将它们传递给消息总线——发布步骤。
最简单的方法是在我们的服务层中添加一些代码
from . import messagebus
...
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
try: #(1)
batchref = product.allocate(line)
uow.commit()
return batchref
finally: #(1)
messagebus.handle(product.events) #(2)
-
我们保留了来自我们早期丑陋实现的
try/finally
(我们还没有摆脱所有异常,只是OutOfStock
)。 -
但现在,服务层不再直接依赖电子邮件基础设施,而只是负责将事件从模型传递到消息总线。
这已经避免了我们在幼稚实现中遇到的一些丑陋之处,并且我们有几个像这样的系统在工作,其中服务层显式地从聚合收集事件并将它们传递到消息总线。
选项 2:服务层引发自己的事件
我们使用过的另一种变体是让服务层负责直接创建和引发事件,而不是让领域模型引发事件
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit() #(1)
if batchref is None:
messagebus.handle(events.OutOfStock(line.sku))
return batchref
-
和以前一样,即使我们分配失败,我们也会提交,因为这种方式代码更简单,更容易推理:除非出现问题,否则我们总是提交。 当我们没有更改任何内容时提交是安全的,并且可以保持代码整洁。
同样,我们有应用程序在生产中以这种方式实现该模式。 什么对您有效将取决于您面临的特定权衡,但我们想向您展示我们认为最优雅的解决方案,即我们将工作单元负责收集和引发事件。
选项 3:UoW 将事件发布到消息总线
UoW 已经有一个 try/finally
,并且它知道当前正在使用的所有聚合,因为它提供了对仓库的访问。 因此,它是发现事件并将它们传递到消息总线的好地方
class AbstractUnitOfWork(abc.ABC):
...
def commit(self):
self._commit() #(1)
self.publish_events() #(2)
def publish_events(self): #(2)
for product in self.products.seen: #(3)
while product.events:
event = product.events.pop(0)
messagebus.handle(event)
@abc.abstractmethod
def _commit(self):
raise NotImplementedError
...
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
...
def _commit(self): #(1)
self.session.commit()
-
我们将更改我们的 commit 方法,使其需要子类的私有
._commit()
方法。 -
提交后,我们遍历仓库已看到的所有对象,并将它们的事件传递给消息总线。
-
这依赖于仓库使用一个名为
.seen
的新属性来跟踪已加载的聚合,您将在下一个列表中看到。
注意
|
您是否想知道如果其中一个处理程序失败会发生什么? 我们将在 [chapter_10_commands] 中详细讨论错误处理。 |
class AbstractRepository(abc.ABC):
def __init__(self):
self.seen = set() # type: Set[model.Product] #(1)
def add(self, product: model.Product): #(2)
self._add(product)
self.seen.add(product)
def get(self, sku) -> model.Product: #(3)
product = self._get(sku)
if product:
self.seen.add(product)
return product
@abc.abstractmethod
def _add(self, product: model.Product): #(2)
raise NotImplementedError
@abc.abstractmethod #(3)
def _get(self, sku) -> model.Product:
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
def __init__(self, session):
super().__init__()
self.session = session
def _add(self, product): #(2)
self.session.add(product)
def _get(self, sku): #(3)
return self.session.query(model.Product).filter_by(sku=sku).first()
-
为了使 UoW 能够发布新事件,它需要能够向仓库询问在此会话期间使用了哪些
Product
对象。 我们使用一个名为.seen
的set
来存储它们。 这意味着我们的实现需要调用super().__init__()
。 -
父
add()
方法将事物添加到.seen
,现在要求子类实现._add()
。 -
类似地,
.get()
委托给一个._get()
函数,该函数由子类实现,以便捕获看到的对象。
注意
|
使用 ._underscorey() 方法和子类绝对不是您可以实现这些模式的唯一方法。 尝试本章的“读者练习”并尝试一些替代方案。 |
在 UoW 和仓库以这种方式协作以自动跟踪活动对象并处理其事件之后,服务层可以完全摆脱事件处理的关注
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f"Invalid sku {line.sku}")
batchref = product.allocate(line)
uow.commit()
return batchref
我们确实还必须记住更改服务层中的伪造对象,并在正确的位置使它们调用 super()
,并实现 underscorey 方法,但更改是最小的
class FakeRepository(repository.AbstractRepository):
def __init__(self, products):
super().__init__()
self._products = set(products)
def _add(self, product):
self._products.add(product)
def _get(self, sku):
return next((p for p in self._products if p.sku == sku), None)
...
class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
...
def _commit(self):
self.committed = True
您可能开始担心维护这些伪造对象将成为维护负担。 毫无疑问,这是一项工作,但根据我们的经验,工作量不大。 一旦您的项目启动并运行,您的仓库和 UoW 抽象的接口实际上就不会发生太大变化。 而且,如果您正在使用 ABC,它们将帮助您在事情不同步时提醒您。
总结
领域事件为我们提供了一种处理系统中工作流程的方式。 我们经常发现,在听取我们的领域专家时,他们以因果或时间方式表达需求——例如,“当我们尝试分配库存但没有可用库存时,我们应该向采购团队发送电子邮件。”
“当 X,然后 Y”这些神奇的词语通常会告诉我们一个事件,我们可以使该事件在我们的系统中具体化。 将事件视为我们模型中的一等公民有助于我们使代码更易于测试和观察,并且有助于隔离关注点。
并且 领域事件:权衡 显示了我们看到的权衡。
优点 | 缺点 |
---|---|
|
|
但是,事件的用途不仅限于发送电子邮件。 在 [chapter_07_aggregate] 中,我们花了很多时间来说服您应该定义聚合,或者我们保证一致性的边界。 人们经常问,“如果我需要在请求中更改多个聚合,我应该怎么做?” 现在我们有了回答这个问题的工具。
如果我们有两个可以事务隔离的东西(例如,订单和产品),那么我们可以通过使用事件使它们最终一致。 当订单被取消时,我们应该找到分配给它的产品并删除分配。
在 [chapter_09_all_messagebus] 中,当我们使用新的消息总线构建更复杂的工作流程时,我们将更详细地研究这个想法。