9:消息总线之旅
在本章中,我们将开始使事件对应用程序的内部结构更加基础。我们将从 之前:消息总线是可选的附加组件 中的当前状态开始,其中事件是可选的副作用……

……到 消息总线现在是服务层的主要入口点 中的情况,其中一切都通过消息总线,并且我们的应用程序已从根本上转变为消息处理器。

提示
|
本章的代码位于 chapter_09_all_messagebus 分支 在 GitHub 上 git clone https://github.com/cosmicpython/code.git cd code git checkout chapter_09_all_messagebus # or to code along, checkout the previous chapter: git checkout chapter_08_events_and_message_bus |
新需求引导我们走向新架构
Rich Hickey 谈到了情境软件,指的是长时间运行,管理现实世界流程的软件。示例包括仓库管理系统、物流调度器和工资系统。
这种软件很难编写,因为在物理对象和不可靠的人类的现实世界中,意外情况随时发生。例如
-
在盘点期间,我们发现三个
SPRINGY-MATTRESS
床垫因屋顶漏水而受损。 -
一批
RELIABLE-FORK
叉子缺少所需的文件,并在海关滞留数周。随后,三个RELIABLE-FORK
叉子未能通过安全测试而被销毁。 -
全球亮片短缺意味着我们无法生产下一批
SPARKLY-BOOKCASE
闪光书柜。
在这些类型的情况下,我们了解到需要在批次数量已在系统中时更改它们。也许有人在清单上的数量上犯了错误,或者可能有些沙发从卡车上掉下来了。在与业务部门对话后,[1] 我们将情况建模为 批次数量更改意味着取消分配和重新分配 中所示。

[ditaa, apwp_0903] +----------+ /----\ +------------+ +--------------------+ | Batch |--> |RULE| --> | Deallocate | ----> | AllocationRequired | | Quantity | \----/ +------------+-+ +--------------------+-+ | Changed | | Deallocate | ----> | AllocationRequired | +----------+ +------------+-+ +--------------------+-+ | Deallocate | ----> | AllocationRequired | +------------+ +--------------------+
一个我们称之为 BatchQuantityChanged
的事件应该引导我们更改批次上的数量,是的,但也要应用一个业务规则:如果新数量降至低于已分配总量的水平,我们需要从该批次取消分配这些订单。然后,每个订单都需要重新分配,我们可以将其捕获为名为 AllocationRequired
的事件。
也许您已经预料到我们的内部消息总线和事件可以帮助实现此需求。我们可以定义一个名为 change_batch_quantity
的服务,该服务知道如何调整批次数量以及如何取消分配任何多余的订单行,然后每个取消分配都可以发出一个 AllocationRequired
事件,该事件可以转发到现有的 allocate
服务,在单独的事务中。再一次,我们的消息总线帮助我们实施单一职责原则,并且它允许我们对事务和数据完整性做出选择。
设想架构变更:一切都将是 事件处理器
但在我们深入之前,请考虑一下我们的目标。我们的系统中有两种流程
-
由服务层函数处理的 API 调用
-
内部事件(可能是作为服务层函数的副作用而引发的)及其处理器(反过来调用服务层函数)
如果一切都是事件处理器,岂不是更容易?如果我们将 API 调用重新思考为捕获事件,则服务层函数也可以是事件处理器,并且我们不再需要区分内部和外部事件处理器
-
services.allocate()
可以是AllocationRequired
事件的处理器,并且可以发出Allocated
事件作为其输出。 -
services.add_batch()
可以是BatchCreated
事件的处理器。[2]
我们的新需求将符合相同的模式
-
名为
BatchQuantityChanged
的事件可以调用名为change_batch_quantity()
的处理器。 -
它可能引发的新
AllocationRequired
事件也可以传递给services.allocate()
,因此来自 API 的全新分配与由取消分配在内部触发的重新分配之间没有概念上的区别。
听起来有点多?让我们逐步实现它。我们将遵循 准备性重构 工作流程,又名“使更改容易;然后进行容易的更改”
-
我们将服务层重构为事件处理器。我们可以习惯于将事件作为我们描述系统输入的方式的想法。特别是,现有的
services.allocate()
函数将成为名为AllocationRequired
的事件的处理器。 -
我们构建一个端到端测试,将
BatchQuantityChanged
事件放入系统,并查找Allocated
事件输出。 -
我们的实现概念上将非常简单:一个新的
BatchQuantityChanged
事件处理器,其实施将发出AllocationRequired
事件,而这些事件又将由 API 使用的完全相同的分配处理器处理。
在此过程中,我们将对消息总线和 UoW 进行一个小小的调整,将将新事件放入消息总线的责任转移到消息总线本身中。
将服务函数重构为消息处理器
我们首先定义两个事件,它们捕获我们当前的 API 输入——AllocationRequired 和 BatchCreated
@dataclass
class BatchCreated(Event):
ref: str
sku: str
qty: int
eta: Optional[date] = None
...
@dataclass
class AllocationRequired(Event):
orderid: str
sku: str
qty: int
然后我们将 services.py 重命名为 handlers.py;我们添加了现有的 send_out_of_stock_notification
消息处理器;最重要的是,我们更改了所有处理器,使它们具有相同的输入,一个事件和一个 UoW
def add_batch(
event: events.BatchCreated,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get(sku=event.sku)
...
def allocate(
event: events.AllocationRequired,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
line = OrderLine(event.orderid, event.sku, event.qty)
...
def send_out_of_stock_notification(
event: events.OutOfStock,
uow: unit_of_work.AbstractUnitOfWork,
):
email.send(
"stock@made.com",
f"Out of stock for {event.sku}",
)
作为差异,更改可能更清晰
def add_batch(
- ref: str, sku: str, qty: int, eta: Optional[date],
+ event: events.BatchCreated,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
- product = uow.products.get(sku=sku)
+ product = uow.products.get(sku=event.sku)
...
def allocate(
- orderid: str, sku: str, qty: int,
+ event: events.AllocationRequired,
uow: unit_of_work.AbstractUnitOfWork,
) -> str:
- line = OrderLine(orderid, sku, qty)
+ line = OrderLine(event.orderid, event.sku, event.qty)
...
+
+def send_out_of_stock_notification(
+ event: events.OutOfStock,
+ uow: unit_of_work.AbstractUnitOfWork,
+):
+ email.send(
...
在此过程中,我们使服务层的 API 更加结构化和一致。它曾经是原始类型的散布,现在它使用定义良好的对象(请参阅以下侧边栏)。
消息总线现在从 UoW 收集事件
我们的事件处理器现在需要一个 UoW。此外,随着我们的消息总线在我们应用程序中变得越来越重要,将其显式地负责收集和处理新事件是有意义的。到目前为止,UoW 和消息总线之间存在一些循环依赖,因此这将使其成为单向的。我们将让消息总线从 UoW 拉取事件,而不是让 UoW 将事件推送到消息总线上。
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork, #(1)
):
queue = [event] #(2)
while queue:
event = queue.pop(0) #(3)
for handler in HANDLERS[type(event)]: #(3)
handler(event, uow=uow) #(4)
queue.extend(uow.collect_new_events()) #(5)
-
每次启动时,消息总线现在都会传递 UoW。
-
当我们开始处理我们的第一个事件时,我们启动一个队列。
-
我们从队列的前面弹出事件并调用它们的处理器(
HANDLERS
字典没有改变;它仍然将事件类型映射到处理器函数)。 -
消息总线将 UoW 传递给每个处理器。
-
在每个处理器完成后,我们收集已生成的任何新事件并将它们添加到队列中。
在 unit_of_work.py 中,publish_events()
变成了一个不太活跃的方法,collect_new_events()
-from . import messagebus #(1)
class AbstractUnitOfWork(abc.ABC):
@@ -22,13 +21,11 @@ class AbstractUnitOfWork(abc.ABC):
def commit(self):
self._commit()
- self.publish_events() #(2)
- def publish_events(self):
+ def collect_new_events(self):
for product in self.products.seen:
while product.events:
- event = product.events.pop(0)
- messagebus.handle(event)
+ yield product.events.pop(0) #(3)
-
unit_of_work
模块现在不再依赖于messagebus
。 -
我们不再在提交时自动
publish_events
。消息总线正在跟踪事件队列。 -
并且 UoW 不再主动将事件放在消息总线上;它只是使它们可用。
我们的测试也都是用事件编写的
我们的测试现在通过创建事件并将它们放在消息总线上来操作,而不是直接调用服务层函数
class TestAddBatch:
def test_for_new_product(self):
uow = FakeUnitOfWork()
- services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow)
+ messagebus.handle(
+ events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
+ )
assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
assert uow.committed
...
class TestAllocate:
def test_returns_allocation(self):
uow = FakeUnitOfWork()
- services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow)
- result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow)
+ messagebus.handle(
+ events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
+ )
+ result = messagebus.handle(
+ events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
+ )
assert result == "batch1"
临时的丑陋 hack:消息总线必须返回结果
我们的 API 和我们的服务层目前想知道当它们调用我们的 allocate()
处理器时分配的批次引用。这意味着我们需要在我们的消息总线上进行临时 hack,以使其返回事件
def handle(
event: events.Event,
uow: unit_of_work.AbstractUnitOfWork,
):
+ results = []
queue = [event]
while queue:
event = queue.pop(0)
for handler in HANDLERS[type(event)]:
- handler(event, uow=uow)
+ results.append(handler(event, uow=uow))
queue.extend(uow.collect_new_events())
+ return results
这是因为我们在系统中混合了读取和写入职责。我们将在 [chapter_12_cqrs] 中回来修复这个缺陷。
修改我们的 API 以使用事件
@app.route("/allocate", methods=["POST"])
def allocate_endpoint():
try:
- batchref = services.allocate(
- request.json["orderid"], #(1)
- request.json["sku"],
- request.json["qty"],
- unit_of_work.SqlAlchemyUnitOfWork(),
+ event = events.AllocationRequired( #(2)
+ request.json["orderid"], request.json["sku"], request.json["qty"]
)
+ results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork()) #(3)
+ batchref = results.pop(0)
except InvalidSku as e:
-
而不是使用从请求 JSON 中提取的一堆原始类型调用服务层……
-
我们实例化一个事件。
-
然后我们将其传递给消息总线。
我们应该回到一个功能齐全的应用程序,但现在是一个完全事件驱动的应用程序
-
曾经是服务层函数的现在是事件处理器。
-
这使它们与我们为处理领域模型引发的内部事件而调用的函数相同。
-
我们使用事件作为数据结构来捕获系统的输入,以及移交内部工作包。
-
整个应用程序现在最好被描述为消息处理器,或者如果您愿意,也可以称为事件处理器。我们将在 下一章 中讨论它们之间的区别。
实施我们的新需求
我们的重构阶段已完成。让我们看看我们是否真的“使更改变得容易”。让我们实施我们的新需求,如 重新分配流程的序列图 所示:我们将收到一些新的 BatchQuantityChanged
事件作为我们的输入,并将它们传递给处理器,而处理器又可能发出一些 AllocationRequired
事件,而这些事件又将返回到我们现有的重新分配处理器。

[plantuml, apwp_0904, config=plantuml.cfg] @startuml scale 4 API -> MessageBus : BatchQuantityChanged event group BatchQuantityChanged Handler + Unit of Work 1 MessageBus -> Domain_Model : change batch quantity Domain_Model -> MessageBus : emit AllocationRequired event(s) end group AllocationRequired Handler + Unit of Work 2 (or more) MessageBus -> Domain_Model : allocate end @enduml
警告
|
当您像这样将事物拆分到两个工作单元中时,您现在有两个数据库事务,因此您将自己暴露于完整性问题:可能会发生某些事情,这意味着第一个事务完成但第二个事务未完成。您需要考虑这是否可以接受,以及是否需要注意何时发生这种情况并采取一些措施。有关更多讨论,请参阅 [footguns]。 |
测试驱动新处理器 (ch9)
遵循在 [chapter_04_service_layer] 中学到的经验教训,我们可以在“高速”模式下操作,并在可能的最高抽象级别(就事件而言)编写单元测试。以下是它们可能的样子
class TestChangeBatchQuantity:
def test_changes_available_quantity(self):
uow = FakeUnitOfWork()
messagebus.handle(
events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
)
[batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
assert batch.available_quantity == 100 #(1)
messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)
assert batch.available_quantity == 50 #(1)
def test_reallocates_if_necessary(self):
uow = FakeUnitOfWork()
event_history = [
events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
]
for e in event_history:
messagebus.handle(e, uow)
[batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
assert batch1.available_quantity == 10
assert batch2.available_quantity == 50
messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
# order1 or order2 will be deallocated, so we'll have 25 - 20
assert batch1.available_quantity == 5 #(2)
# and 20 will be reallocated to the next batch
assert batch2.available_quantity == 30 #(2)
-
简单的情况很容易实现;我们只是修改数量。
-
但是,如果我们尝试将数量更改为小于已分配的数量,我们将需要取消分配至少一个订单,并且我们希望将其重新分配到新批次。
实施
我们的新处理器非常简单
def change_batch_quantity(
event: events.BatchQuantityChanged,
uow: unit_of_work.AbstractUnitOfWork,
):
with uow:
product = uow.products.get_by_batchref(batchref=event.ref)
product.change_batch_quantity(ref=event.ref, qty=event.qty)
uow.commit()
我们意识到我们需要在我们的仓库中添加一种新的查询类型
class AbstractRepository(abc.ABC):
...
def get(self, sku) -> model.Product:
...
def get_by_batchref(self, batchref) -> model.Product:
product = self._get_by_batchref(batchref)
if product:
self.seen.add(product)
return product
@abc.abstractmethod
def _add(self, product: model.Product):
raise NotImplementedError
@abc.abstractmethod
def _get(self, sku) -> model.Product:
raise NotImplementedError
@abc.abstractmethod
def _get_by_batchref(self, batchref) -> model.Product:
raise NotImplementedError
...
class SqlAlchemyRepository(AbstractRepository):
...
def _get(self, sku):
return self.session.query(model.Product).filter_by(sku=sku).first()
def _get_by_batchref(self, batchref):
return (
self.session.query(model.Product)
.join(model.Batch)
.filter(orm.batches.c.reference == batchref)
.first()
)
以及我们的 FakeRepository
也是如此
class FakeRepository(repository.AbstractRepository):
...
def _get(self, sku):
return next((p for p in self._products if p.sku == sku), None)
def _get_by_batchref(self, batchref):
return next(
(p for p in self._products for b in p.batches if b.reference == batchref),
None,
)
注意
|
我们正在向我们的仓库添加一个查询,以使此用例更易于实施。只要我们的查询返回单个聚合,我们就不会违反任何规则。如果您发现自己在仓库上编写复杂的查询,您可能需要考虑不同的设计。特别是像 get_most_popular_products 或 find_products_by_order_id 这样的方法肯定会触发我们的蜘蛛感应。[chapter_11_external_events] 和 尾声 提供了一些关于管理复杂查询的技巧。 |
领域模型上的新方法
我们将新方法添加到模型中,该方法执行数量更改和取消分配内联,并发布新事件。我们还修改了现有的 allocate 函数以发布事件
class Product:
...
def change_batch_quantity(self, ref: str, qty: int):
batch = next(b for b in self.batches if b.reference == ref)
batch._purchased_quantity = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(
events.AllocationRequired(line.orderid, line.sku, line.qty)
)
...
class Batch:
...
def deallocate_one(self) -> OrderLine:
return self._allocations.pop()
我们连接我们的新处理器
HANDLERS = {
events.BatchCreated: [handlers.add_batch],
events.BatchQuantityChanged: [handlers.change_batch_quantity],
events.AllocationRequired: [handlers.allocate],
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
我们的新需求已完全实施。
可选:使用 Fake Message Bus 隔离地对事件处理器进行单元测试
我们针对重新分配工作流程的主要测试是端到端的(请参阅 测试驱动新处理器 (ch9) 中的示例代码)。它使用真正的消息总线,并且它测试整个流程,其中 BatchQuantityChanged
事件处理器触发取消分配,并发出新的 AllocationRequired
事件,而这些事件又由它们自己的处理器处理。一个测试涵盖了多个事件和处理器的链。
根据您的事件链的复杂性,您可能会决定要将一些处理器彼此隔离地进行测试。您可以使用“fake”消息总线来做到这一点。
在我们的例子中,我们实际上通过修改 FakeUnitOfWork
上的 publish_events()
方法进行干预,并将其与真正的消息总线解耦,而是使其记录它看到的事件
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
def __init__(self):
super().__init__()
self.events_published = [] # type: List[events.Event]
def publish_events(self):
for product in self.products.seen:
while product.events:
self.events_published.append(product.events.pop(0))
现在,当我们使用 FakeUnitOfWorkWithFakeMessageBus
调用 messagebus.handle()
时,它仅运行该事件的处理器。因此,我们可以编写更隔离的单元测试:不是检查所有副作用,我们只检查如果数量降至低于已分配总量的水平,BatchQuantityChanged
是否会导致 AllocationRequired
def test_reallocates_if_necessary_isolated():
uow = FakeUnitOfWorkWithFakeMessageBus()
# test setup as before
event_history = [
events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
]
for e in event_history:
messagebus.handle(e, uow)
[batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
assert batch1.available_quantity == 10
assert batch2.available_quantity == 50
messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)
# assert on new events emitted rather than downstream side-effects
[reallocation_event] = uow.events_published
assert isinstance(reallocation_event, events.AllocationRequired)
assert reallocation_event.orderid in {"order1", "order2"}
assert reallocation_event.sku == "INDIFFERENT-TABLE"
您是否要这样做取决于您的事件链的复杂性。我们说,从端到端测试开始,只有在必要时才求助于此。
总结
让我们回顾一下我们所取得的成就,并思考一下我们为什么要这样做。
我们取得了什么成就?
事件是简单的 dataclass,它们定义了我们系统中输入和内部消息的数据结构。从 DDD 的角度来看,这非常强大,因为事件通常可以很好地转化为业务语言(如果您尚未了解,请查找事件风暴)。
处理器是我们对事件做出反应的方式。它们可以向下调用我们的模型或向外调用外部服务。如果需要,我们可以为一个事件定义多个处理器。处理器还可以引发其他事件。这使我们能够在处理器执行的操作上非常精细,并真正坚持 SRP。
我们为什么要取得成就?
我们使用这些架构模式的持续目标是尝试使我们应用程序的复杂性增长速度慢于其规模。当我们全力以赴地使用消息总线时,与往常一样,我们在架构复杂性方面付出了代价(请参阅 整个应用程序都是消息总线:权衡),但是我们为自己购买了一种模式,该模式可以处理几乎任意复杂的需求,而无需对我们做事的方式进行任何进一步的概念或架构更改。
在这里,我们添加了一个相当复杂的用例(更改数量、取消分配、启动新事务、重新分配、发布外部通知),但在架构上,复杂性方面没有任何成本。我们添加了新的事件、新的处理器和新的外部适配器(用于电子邮件),所有这些都是我们架构中事物的现有类别,我们理解并知道如何推理,并且易于向新手解释。我们的移动部件每个都有一项工作,它们以明确定义的方式相互连接,并且没有意外的副作用。
优点 | 缺点 |
---|---|
|
|
现在,您可能想知道,这些 BatchQuantityChanged
事件将从何而来?答案将在接下来的几章中揭晓。但首先,让我们谈谈 事件与命令。