buy the book ribbon

9:消息总线之旅

在本章中,我们将开始使事件对应用程序的内部结构更加基础。我们将从 之前:消息总线是可选的附加组件 中的当前状态开始,其中事件是可选的副作用……

apwp 0901
图 1. 之前:消息总线是可选的附加组件

……​到 消息总线现在是服务层的主要入口点 中的情况,其中一切都通过消息总线,并且我们的应用程序已从根本上转变为消息处理器。

apwp 0902
图 2. 消息总线现在是服务层的主要入口点
提示

本章的代码位于 chapter_09_all_messagebus 分支 在 GitHub 上

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_09_all_messagebus
# or to code along, checkout the previous chapter:
git checkout chapter_08_events_and_message_bus

新需求引导我们走向新架构

Rich Hickey 谈到了情境软件,指的是长时间运行,管理现实世界流程的软件。示例包括仓库管理系统、物流调度器和工资系统。

这种软件很难编写,因为在物理对象和不可靠的人类的现实世界中,意外情况随时发生。例如

  • 在盘点期间,我们发现三个 SPRINGY-MATTRESS 床垫因屋顶漏水而受损。

  • 一批 RELIABLE-FORK 叉子缺少所需的文件,并在海关滞留数周。随后,三个 RELIABLE-FORK 叉子未能通过安全测试而被销毁。

  • 全球亮片短缺意味着我们无法生产下一批 SPARKLY-BOOKCASE 闪光书柜。

在这些类型的情况下,我们了解到需要在批次数量已在系统中时更改它们。也许有人在清单上的数量上犯了错误,或者可能有些沙发从卡车上掉下来了。在与业务部门对话后,[1] 我们将情况建模为 批次数量更改意味着取消分配和重新分配 中所示。

apwp 0903
图 3. 批次数量更改意味着取消分配和重新分配
[ditaa, apwp_0903]
+----------+    /----\      +------------+       +--------------------+
| Batch    |--> |RULE| -->  | Deallocate | ----> | AllocationRequired |
| Quantity |    \----/      +------------+-+     +--------------------+-+
| Changed  |                  | Deallocate | ----> | AllocationRequired |
+----------+                  +------------+-+     +--------------------+-+
                                | Deallocate | ----> | AllocationRequired |
                                +------------+       +--------------------+

一个我们称之为 BatchQuantityChanged 的事件应该引导我们更改批次上的数量,是的,但也要应用一个业务规则:如果新数量降至低于已分配总量的水平,我们需要从该批次取消分配这些订单。然后,每个订单都需要重新分配,我们可以将其捕获为名为 AllocationRequired 的事件。

也许您已经预料到我们的内部消息总线和事件可以帮助实现此需求。我们可以定义一个名为 change_batch_quantity 的服务,该服务知道如何调整批次数量以及如何取消分配任何多余的订单行,然后每个取消分配都可以发出一个 AllocationRequired 事件,该事件可以转发到现有的 allocate 服务,在单独的事务中。再一次,我们的消息总线帮助我们实施单一职责原则,并且它允许我们对事务和数据完整性做出选择。

设想架构变更:一切都将是 事件处理器

但在我们深入之前,请考虑一下我们的目标。我们的系统中有两种流程

  • 由服务层函数处理的 API 调用

  • 内部事件(可能是作为服务层函数的副作用而引发的)及其处理器(反过来调用服务层函数)

如果一切都是事件处理器,岂不是更容易?如果我们将 API 调用重新思考为捕获事件,则服务层函数也可以是事件处理器,并且我们不再需要区分内部和外部事件处理器

  • services.allocate() 可以是 AllocationRequired 事件的处理器,并且可以发出 Allocated 事件作为其输出。

  • services.add_batch() 可以是 BatchCreated 事件的处理器。[2]

我们的新需求将符合相同的模式

  • 名为 BatchQuantityChanged 的事件可以调用名为 change_batch_quantity() 的处理器。

  • 它可能引发的新 AllocationRequired 事件也可以传递给 services.allocate(),因此来自 API 的全新分配与由取消分配在内部触发的重新分配之间没有概念上的区别。

听起来有点多?让我们逐步实现它。我们将遵循 准备性重构 工作流程,又名“使更改容易;然后进行容易的更改”

  1. 我们将服务层重构为事件处理器。我们可以习惯于将事件作为我们描述系统输入的方式的想法。特别是,现有的 services.allocate() 函数将成为名为 AllocationRequired 的事件的处理器。

  2. 我们构建一个端到端测试,将 BatchQuantityChanged 事件放入系统,并查找 Allocated 事件输出。

  3. 我们的实现概念上将非常简单:一个新的 BatchQuantityChanged 事件处理器,其实施将发出 AllocationRequired 事件,而这些事件又将由 API 使用的完全相同的分配处理器处理。

在此过程中,我们将对消息总线和 UoW 进行一个小小的调整,将将新事件放入消息总线的责任转移到消息总线本身中。

将服务函数重构为消息处理器

我们首先定义两个事件,它们捕获我们当前的 API 输入——AllocationRequired 和 BatchCreated

BatchCreated 和 AllocationRequired 事件 (src/allocation/domain/events.py)
@dataclass
class BatchCreated(Event):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

...

@dataclass
class AllocationRequired(Event):
    orderid: str
    sku: str
    qty: int

然后我们将 services.py 重命名为 handlers.py;我们添加了现有的 send_out_of_stock_notification 消息处理器;最重要的是,我们更改了所有处理器,使它们具有相同的输入,一个事件和一个 UoW

处理器和服务是同一件事 (src/allocation/service_layer/handlers.py)
def add_batch(
    event: events.BatchCreated,
    uow: unit_of_work.AbstractUnitOfWork,
):
    with uow:
        product = uow.products.get(sku=event.sku)
        ...


def allocate(
    event: events.AllocationRequired,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(event.orderid, event.sku, event.qty)
    ...


def send_out_of_stock_notification(
    event: events.OutOfStock,
    uow: unit_of_work.AbstractUnitOfWork,
):
    email.send(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )

作为差异,更改可能更清晰

从服务更改为处理器 (src/allocation/service_layer/handlers.py)
 def add_batch(
-    ref: str, sku: str, qty: int, eta: Optional[date],
+    event: events.BatchCreated,
     uow: unit_of_work.AbstractUnitOfWork,
 ):
     with uow:
-        product = uow.products.get(sku=sku)
+        product = uow.products.get(sku=event.sku)
     ...


 def allocate(
-    orderid: str, sku: str, qty: int,
+    event: events.AllocationRequired,
     uow: unit_of_work.AbstractUnitOfWork,
 ) -> str:
-    line = OrderLine(orderid, sku, qty)
+    line = OrderLine(event.orderid, event.sku, event.qty)
     ...

+
+def send_out_of_stock_notification(
+    event: events.OutOfStock,
+    uow: unit_of_work.AbstractUnitOfWork,
+):
+    email.send(
     ...

在此过程中,我们使服务层的 API 更加结构化和一致。它曾经是原始类型的散布,现在它使用定义良好的对象(请参阅以下侧边栏)。

从领域对象,通过原始类型偏执,到 事件作为接口

你们中的一些人可能还记得 [primitive_obsession],我们在其中将服务层 API 从使用领域对象更改为使用原始类型。现在我们又回来了,但是回到了不同的对象?这是怎么回事?

在 OO 圈子里,人们谈论原始类型偏执作为一种反模式:避免在公共 API 中使用原始类型,而是用自定义值类包装它们,他们会说。在 Python 世界中,很多人会非常怀疑这是否是一个经验法则。当盲目应用时,它肯定是不必要的复杂性的根源。所以这不是我们正在做的本身。

从领域对象到原始类型的转变为我们带来了一些不错的解耦:我们的客户端代码不再直接耦合到领域,因此即使我们决定更改模型,服务层也可以呈现保持不变的 API,反之亦然。

那么我们倒退了吗?好吧,我们的核心领域模型对象仍然可以自由变化,但是相反,我们将外部世界耦合到我们的事件类。它们也是领域的一部分,但希望它们的变化频率较低,因此它们是明智的耦合工件。

我们为自己买了什么?现在,当在我们的应用程序中调用用例时,我们不再需要记住原始类型的特定组合,而只需要一个表示应用程序输入的事件类。这在概念上非常不错。最重要的是,正如您将在 [appendix_validation] 中看到的那样,这些事件类可以是进行一些输入验证的好地方。

消息总线现在从 UoW 收集事件

我们的事件处理器现在需要一个 UoW。此外,随着我们的消息总线在我们应用程序中变得越来越重要,将其显式地负责收集和处理新事件是有意义的。到目前为止,UoW 和消息总线之间存在一些循环依赖,因此这将使其成为单向的。我们将让消息总线从 UoW 拉取事件,而不是让 UoW 将事件推送到消息总线上。

Handle 接受一个 UoW 并管理一个队列 (src/allocation/service_layer/messagebus.py)
def handle(
    event: events.Event,
    uow: unit_of_work.AbstractUnitOfWork,  #(1)
):
    queue = [event]  #(2)
    while queue:
        event = queue.pop(0)  #(3)
        for handler in HANDLERS[type(event)]:  #(3)
            handler(event, uow=uow)  #(4)
            queue.extend(uow.collect_new_events())  #(5)
  1. 每次启动时,消息总线现在都会传递 UoW。

  2. 当我们开始处理我们的第一个事件时,我们启动一个队列。

  3. 我们从队列的前面弹出事件并调用它们的处理器(HANDLERS 字典没有改变;它仍然将事件类型映射到处理器函数)。

  4. 消息总线将 UoW 传递给每个处理器。

  5. 在每个处理器完成后,我们收集已生成的任何新事件并将它们添加到队列中。

unit_of_work.py 中,publish_events() 变成了一个不太活跃的方法,collect_new_events()

UoW 不再直接将事件放在总线上 (src/allocation/service_layer/unit_of_work.py)
-from . import messagebus  #(1)


 class AbstractUnitOfWork(abc.ABC):
@@ -22,13 +21,11 @@ class AbstractUnitOfWork(abc.ABC):

     def commit(self):
         self._commit()
-        self.publish_events()  #(2)

-    def publish_events(self):
+    def collect_new_events(self):
         for product in self.products.seen:
             while product.events:
-                event = product.events.pop(0)
-                messagebus.handle(event)
+                yield product.events.pop(0)  #(3)
  1. unit_of_work 模块现在不再依赖于 messagebus

  2. 我们不再在提交时自动 publish_events。消息总线正在跟踪事件队列。

  3. 并且 UoW 不再主动将事件放在消息总线上;它只是使它们可用。

我们的测试也都是用事件编写的

我们的测试现在通过创建事件并将它们放在消息总线上来操作,而不是直接调用服务层函数

处理器测试使用事件 (tests/unit/test_handlers.py)
class TestAddBatch:
     def test_for_new_product(self):
         uow = FakeUnitOfWork()
-        services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow)
+        messagebus.handle(
+            events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
+        )
         assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
         assert uow.committed

...

 class TestAllocate:
     def test_returns_allocation(self):
         uow = FakeUnitOfWork()
-        services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow)
-        result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow)
+        messagebus.handle(
+            events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
+        )
+        result = messagebus.handle(
+            events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
+        )
         assert result == "batch1"

临时的丑陋 hack:消息总线必须返回结果

我们的 API 和我们的服务层目前想知道当它们调用我们的 allocate() 处理器时分配的批次引用。这意味着我们需要在我们的消息总线上进行临时 hack,以使其返回事件

消息总线返回结果 (src/allocation/service_layer/messagebus.py)
 def handle(
     event: events.Event,
     uow: unit_of_work.AbstractUnitOfWork,
 ):
+    results = []
     queue = [event]
     while queue:
         event = queue.pop(0)
         for handler in HANDLERS[type(event)]:
-            handler(event, uow=uow)
+            results.append(handler(event, uow=uow))
             queue.extend(uow.collect_new_events())
+    return results

这是因为我们在系统中混合了读取和写入职责。我们将在 [chapter_12_cqrs] 中回来修复这个缺陷。

修改我们的 API 以使用事件

Flask 更改为消息总线作为差异 (src/allocation/entrypoints/flask_app.py)
 @app.route("/allocate", methods=["POST"])
 def allocate_endpoint():
     try:
-        batchref = services.allocate(
-            request.json["orderid"],  #(1)
-            request.json["sku"],
-            request.json["qty"],
-            unit_of_work.SqlAlchemyUnitOfWork(),
+        event = events.AllocationRequired(  #(2)
+            request.json["orderid"], request.json["sku"], request.json["qty"]
         )
+        results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork())  #(3)
+        batchref = results.pop(0)
     except InvalidSku as e:
  1. 而不是使用从请求 JSON 中提取的一堆原始类型调用服务层……

  2. 我们实例化一个事件。

  3. 然后我们将其传递给消息总线。

我们应该回到一个功能齐全的应用程序,但现在是一个完全事件驱动的应用程序

  • 曾经是服务层函数的现在是事件处理器。

  • 这使它们与我们为处理领域模型引发的内部事件而调用的函数相同。

  • 我们使用事件作为数据结构来捕获系统的输入,以及移交内部工作包。

  • 整个应用程序现在最好被描述为消息处理器,或者如果您愿意,也可以称为事件处理器。我们将在 下一章 中讨论它们之间的区别。

实施我们的新需求

我们的重构阶段已完成。让我们看看我们是否真的“使更改变得容易”。让我们实施我们的新需求,如 重新分配流程的序列图 所示:我们将收到一些新的 BatchQuantityChanged 事件作为我们的输入,并将它们传递给处理器,而处理器又可能发出一些 AllocationRequired 事件,而这些事件又将返回到我们现有的重新分配处理器。

apwp 0904
图 4. 重新分配流程的序列图
[plantuml, apwp_0904, config=plantuml.cfg]
@startuml
scale 4

API -> MessageBus : BatchQuantityChanged event

group BatchQuantityChanged Handler + Unit of Work 1
    MessageBus -> Domain_Model : change batch quantity
    Domain_Model -> MessageBus : emit AllocationRequired event(s)
end


group AllocationRequired Handler + Unit of Work 2 (or more)
    MessageBus -> Domain_Model : allocate
end

@enduml
警告
当您像这样将事物拆分到两个工作单元中时,您现在有两个数据库事务,因此您将自己暴露于完整性问题:可能会发生某些事情,这意味着第一个事务完成但第二个事务未完成。您需要考虑这是否可以接受,以及是否需要注意何时发生这种情况并采取一些措施。有关更多讨论,请参阅 [footguns]

我们的新事件

告诉我们批次数量已更改的事件很简单;它只需要批次引用和一个新数量

新事件 (src/allocation/domain/events.py)
@dataclass
class BatchQuantityChanged(Event):
    ref: str
    qty: int

测试驱动新处理器 (ch9)

遵循在 [chapter_04_service_layer] 中学到的经验教训,我们可以在“高速”模式下操作,并在可能的最高抽象级别(就事件而言)编写单元测试。以下是它们可能的样子

change_batch_quantity 的处理器测试 (tests/unit/test_handlers.py)
class TestChangeBatchQuantity:
    def test_changes_available_quantity(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
        )
        [batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
        assert batch.available_quantity == 100  #(1)

        messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)

        assert batch.available_quantity == 50  #(1)

    def test_reallocates_if_necessary(self):
        uow = FakeUnitOfWork()
        event_history = [
            events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
            events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
            events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
            events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
        ]
        for e in event_history:
            messagebus.handle(e, uow)
        [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
        assert batch1.available_quantity == 10
        assert batch2.available_quantity == 50

        messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

        # order1 or order2 will be deallocated, so we'll have 25 - 20
        assert batch1.available_quantity == 5  #(2)
        # and 20 will be reallocated to the next batch
        assert batch2.available_quantity == 30  #(2)
  1. 简单的情况很容易实现;我们只是修改数量。

  2. 但是,如果我们尝试将数量更改为小于已分配的数量,我们将需要取消分配至少一个订单,并且我们希望将其重新分配到新批次。

实施

我们的新处理器非常简单

处理器委托给模型层 (src/allocation/service_layer/handlers.py)
def change_batch_quantity(
    event: events.BatchQuantityChanged,
    uow: unit_of_work.AbstractUnitOfWork,
):
    with uow:
        product = uow.products.get_by_batchref(batchref=event.ref)
        product.change_batch_quantity(ref=event.ref, qty=event.qty)
        uow.commit()

我们意识到我们需要在我们的仓库中添加一种新的查询类型

我们的仓库中的新查询类型 (src/allocation/adapters/repository.py)
class AbstractRepository(abc.ABC):
    ...

    def get(self, sku) -> model.Product:
        ...

    def get_by_batchref(self, batchref) -> model.Product:
        product = self._get_by_batchref(batchref)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):
        raise NotImplementedError

    @abc.abstractmethod
    def _get(self, sku) -> model.Product:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_by_batchref(self, batchref) -> model.Product:
        raise NotImplementedError
    ...

class SqlAlchemyRepository(AbstractRepository):
    ...

    def _get(self, sku):
        return self.session.query(model.Product).filter_by(sku=sku).first()

    def _get_by_batchref(self, batchref):
        return (
            self.session.query(model.Product)
            .join(model.Batch)
            .filter(orm.batches.c.reference == batchref)
            .first()
        )

以及我们的 FakeRepository 也是如此

也更新了 fake repo (tests/unit/test_handlers.py)
class FakeRepository(repository.AbstractRepository):
    ...

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

    def _get_by_batchref(self, batchref):
        return next(
            (p for p in self._products for b in p.batches if b.reference == batchref),
            None,
        )
注意
我们正在向我们的仓库添加一个查询,以使此用例更易于实施。只要我们的查询返回单个聚合,我们就不会违反任何规则。如果您发现自己在仓库上编写复杂的查询,您可能需要考虑不同的设计。特别是像 get_most_popular_productsfind_products_by_order_id 这样的方法肯定会触发我们的蜘蛛感应。[chapter_11_external_events]尾声 提供了一些关于管理复杂查询的技巧。

领域模型上的新方法

我们将新方法添加到模型中,该方法执行数量更改和取消分配内联,并发布新事件。我们还修改了现有的 allocate 函数以发布事件

我们的模型不断发展以捕获新需求 (src/allocation/domain/model.py)
class Product:
    ...

    def change_batch_quantity(self, ref: str, qty: int):
        batch = next(b for b in self.batches if b.reference == ref)
        batch._purchased_quantity = qty
        while batch.available_quantity < 0:
            line = batch.deallocate_one()
            self.events.append(
                events.AllocationRequired(line.orderid, line.sku, line.qty)
            )
...

class Batch:
    ...

    def deallocate_one(self) -> OrderLine:
        return self._allocations.pop()

我们连接我们的新处理器

消息总线增长 (src/allocation/service_layer/messagebus.py)
HANDLERS = {
    events.BatchCreated: [handlers.add_batch],
    events.BatchQuantityChanged: [handlers.change_batch_quantity],
    events.AllocationRequired: [handlers.allocate],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

我们的新需求已完全实施。

可选:使用 Fake Message Bus 隔离地对事件处理器进行单元测试

我们针对重新分配工作流程的主要测试是端到端的(请参阅 测试驱动新处理器 (ch9) 中的示例代码)。它使用真正的消息总线,并且它测试整个流程,其中 BatchQuantityChanged 事件处理器触发取消分配,并发出新的 AllocationRequired 事件,而这些事件又由它们自己的处理器处理。一个测试涵盖了多个事件和处理器的链。

根据您的事件链的复杂性,您可能会决定要将一些处理器彼此隔离地进行测试。您可以使用“fake”消息总线来做到这一点。

在我们的例子中,我们实际上通过修改 FakeUnitOfWork 上的 publish_events() 方法进行干预,并将其与真正的消息总线解耦,而是使其记录它看到的事件

在 UoW 中实施的 Fake 消息总线 (tests/unit/test_handlers.py)
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):
    def __init__(self):
        super().__init__()
        self.events_published = []  # type: List[events.Event]

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                self.events_published.append(product.events.pop(0))

现在,当我们使用 FakeUnitOfWorkWithFakeMessageBus 调用 messagebus.handle() 时,它仅运行该事件的处理器。因此,我们可以编写更隔离的单元测试:不是检查所有副作用,我们只检查如果数量降至低于已分配总量的水平,BatchQuantityChanged 是否会导致 AllocationRequired

隔离测试重新分配 (tests/unit/test_handlers.py)
def test_reallocates_if_necessary_isolated():
    uow = FakeUnitOfWorkWithFakeMessageBus()

    # test setup as before
    event_history = [
        events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
        events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
        events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
        events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
    ]
    for e in event_history:
        messagebus.handle(e, uow)
    [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
    assert batch1.available_quantity == 10
    assert batch2.available_quantity == 50

    messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

    # assert on new events emitted rather than downstream side-effects
    [reallocation_event] = uow.events_published
    assert isinstance(reallocation_event, events.AllocationRequired)
    assert reallocation_event.orderid in {"order1", "order2"}
    assert reallocation_event.sku == "INDIFFERENT-TABLE"

您是否要这样做取决于您的事件链的复杂性。我们说,从端到端测试开始,只有在必要时才求助于此。

读者练习

迫使自己真正理解某些代码的一个好方法是重构它。在讨论隔离测试处理器时,我们使用了名为 FakeUnitOfWorkWithFakeMessageBus 的东西,这不必要地复杂并且违反了 SRP。

如果我们将消息总线更改为一个类,[3] 那么构建 FakeMessageBus 就更直接了

抽象消息总线及其真实版本和 fake 版本

因此,跳入 GitHub 上的代码,看看您是否可以获得基于类的版本,然后从前面编写 test_reallocates_if_necessary_isolated() 的版本。

如果您需要更多灵感,我们将在 [chapter_13_dependency_injection] 中使用基于类的消息总线。

总结

让我们回顾一下我们所取得的成就,并思考一下我们为什么要这样做。

我们取得了什么成就?

事件是简单的 dataclass,它们定义了我们系统中输入和内部消息的数据结构。从 DDD 的角度来看,这非常强大,因为事件通常可以很好地转化为业务语言(如果您尚未了解,请查找事件风暴)。

处理器是我们对事件做出反应的方式。它们可以向下调用我们的模型或向外调用外部服务。如果需要,我们可以为一个事件定义多个处理器。处理器还可以引发其他事件。这使我们能够在处理器执行的操作上非常精细,并真正坚持 SRP。

我们为什么要取得成就?

我们使用这些架构模式的持续目标是尝试使我们应用程序的复杂性增长速度慢于其规模。当我们全力以赴地使用消息总线时,与往常一样,我们在架构复杂性方面付出了代价(请参阅 整个应用程序都是消息总线:权衡),但是我们为自己购买了一种模式,该模式可以处理几乎任意复杂的需求,而无需对我们做事的方式进行任何进一步的概念或架构更改。

在这里,我们添加了一个相当复杂的用例(更改数量、取消分配、启动新事务、重新分配、发布外部通知),但在架构上,复杂性方面没有任何成本。我们添加了新的事件、新的处理器和新的外部适配器(用于电子邮件),所有这些都是我们架构中事物的现有类别,我们理解并知道如何推理,并且易于向新手解释。我们的移动部件每个都有一项工作,它们以明确定义的方式相互连接,并且没有意外的副作用。

表 1. 整个应用程序都是消息总线:权衡
优点 缺点
  • 处理器和服务是同一件事,因此更简单。

  • 我们有一个用于系统输入的良好数据结构。

  • 从 Web 的角度来看,消息总线仍然是一种略微不可预测的做事方式。您无法提前知道事情何时会结束。

  • 模型对象和事件之间会存在字段和结构的重复,这将产生维护成本。向其中一个添加字段通常意味着至少向另一个添加字段。

现在,您可能想知道,这些 BatchQuantityChanged 事件将从何而来?答案将在接下来的几章中揭晓。但首先,让我们谈谈 事件与命令


1. 基于事件的建模非常流行,以至于开发了一种称为事件风暴的实践,以促进基于事件的需求收集和领域模型阐述。
2. 如果您对事件驱动架构进行了一些阅读,您可能会想,“其中一些事件听起来更像是命令!” 请耐心等待!我们试图一次介绍一个概念。在 下一章 中,我们将介绍命令和事件之间的区别。
3. 本章中的“简单”实现本质上是使用 messagebus.py 模块本身来实现单例模式。