buy the book ribbon

8:事件和消息总线

到目前为止,我们已经在一个简单的问题上花费了大量时间和精力,而这个问题我们可以用 Django 轻松解决。您可能会问,增加的可测试性和表达性是否真的值得付出所有努力。

但在实践中,我们发现使我们的代码库一团糟的不是那些显而易见的功能:而是边缘的粘合代码。 它是报告、权限以及涉及无数对象的工作流程。

我们的示例将是一个典型的通知需求:当我们因为缺货而无法分配订单时,我们应该提醒采购团队。 他们会去购买更多库存来解决问题,一切都会好起来的。

对于第一个版本,我们的产品负责人说我们可以通过电子邮件发送警报。

让我们看看当我们需要插入一些构成我们系统大部分的平凡的东西时,我们的架构如何应对。

我们将从做最简单、最快捷的事情开始,并讨论为什么正是这种决定导致我们走向泥潭式的大杂烩。

然后,我们将展示如何使用领域事件模式将副作用与我们的用例分离,以及如何使用简单的消息总线模式来基于这些事件触发行为。我们将展示创建这些事件的几种选项以及如何将它们传递给消息总线,最后我们将展示如何修改工作单元模式以优雅地将两者连接在一起,如事件在系统中流动中预览的那样。

apwp 0801
图 1. 事件在系统中流动
提示

本章的代码位于 chapter_08_events_and_message_bus 分支 GitHub

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_08_events_and_message_bus
# or to code along, checkout the previous chapter:
git checkout chapter_07_aggregate

避免制造混乱

所以。 当我们缺货时发送电子邮件警报。 当我们有像那些与核心领域真的无关的新需求时,很容易开始将这些东西倾倒到我们的 Web 控制器中。

首先,让我们避免使我们的 Web 控制器变得混乱

作为一次性的临时措施,这可能还可以

只需将其敲入端点——会发生什么问题呢? (src/allocation/entrypoints/flask_app.py)

……​但这很容易看出,我们如何通过像这样修补东西而迅速陷入混乱。 发送电子邮件不是我们的 HTTP 层的工作,我们希望能够单元测试这个新功能。

并且让我们也不要使我们的模型变得混乱

假设我们不想将此代码放入我们的 Web 控制器中,因为我们希望它们尽可能精简,我们可能会考虑将其直接放在源头,即模型中

模型中的电子邮件发送代码也不可爱 (src/allocation/domain/model.py)
    def allocate(self, line: OrderLine) -> str:
        try:
            batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
            #...
        except StopIteration:
            email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
            raise OutOfStock(f"Out of stock for sku {line.sku}")

但这更糟! 我们不希望我们的模型对基础设施问题有任何依赖,例如 email.send_mail

这个发送电子邮件的东西是不受欢迎的粘合代码,它扰乱了我们系统良好的清晰流程。 我们想要的是让我们的领域模型专注于“您不能分配比实际可用库存更多的东西”的规则。

或者服务层!

“尝试分配一些库存,如果失败则发送电子邮件”的要求是工作流编排的一个示例:它是系统必须遵循的实现目标的步骤集合。

我们已经编写了一个服务层来为我们管理编排,但即使在这里,该功能也感觉格格不入

并且在服务层中,它显得格格不入 (src/allocation/service_layer/services.py)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        except model.OutOfStock:
            email.send_mail("stock@made.com", f"Out of stock for {line.sku}")
            raise

捕获异常并重新引发它? 情况可能会更糟,但这绝对让我们不高兴。 为什么为这段代码找到合适的家如此困难?

单一职责原则

实际上,这是违反了单一职责原则 (SRP)。[1] 我们的用例是分配。 我们的端点、服务函数和领域方法都称为 allocate,而不是 allocate_and_send_mail_if_out_of_stock

提示
经验法则:如果您在不使用“然后”或“和”之类的词语就无法描述您的函数的作用,那么您可能违反了 SRP。

SRP 的一种表述是,每个类都应该只有一个更改理由。 当我们从电子邮件切换到短信时,我们不应该更新我们的 allocate() 函数,因为这显然是独立的职责。

为了解决这个问题,我们将把编排分成不同的步骤,以便不同的关注点不会纠缠在一起。[2] 领域模型的工作是知道我们缺货,但发送警报的责任属于其他地方。 我们应该能够打开或关闭此功能,或者切换到 SMS 通知,而无需更改我们领域模型的规则。

我们还希望保持服务层免受实现细节的影响。 我们希望将依赖倒置原则应用于通知,以便我们的服务层依赖于抽象,就像我们通过使用工作单元来避免依赖数据库一样。

全体登上消息总线!

我们在这里将要介绍的模式是领域事件消息总线。 我们可以用几种方式实现它们,因此我们将展示几个,然后再确定我们最喜欢的一个。

模型记录事件

首先,我们的模型将负责记录事件——关于已发生的事实,而不是关注电子邮件。 我们将使用消息总线来响应事件并调用新操作。

事件是简单的数据类

事件是一种值对象。 事件没有任何行为,因为它们是纯粹的数据结构。 我们始终以领域语言命名事件,并将它们视为我们领域模型的一部分。

我们可以将它们存储在 model.py 中,但我们不妨将它们保存在自己的文件中(现在可能是考虑重构出一个名为 domain 的目录的好时机,以便我们拥有 domain/model.pydomain/events.py

事件类 (src/allocation/domain/events.py)
from dataclasses import dataclass


class Event:  #(1)
    pass


@dataclass
class OutOfStock(Event):  #(2)
    sku: str
  1. 一旦我们有了许多事件,我们就会发现拥有一个可以存储通用属性的父类很有用。 它对于消息总线中的类型提示也很有用,您很快就会看到。

  2. dataclasses 也非常适合领域事件。

模型引发事件

当我们的领域模型记录一个已发生的事实时,我们说它引发一个事件。

这是从外部看起来的样子; 如果我们要求 Product 分配但它无法分配,它应该引发一个事件

测试我们的聚合以引发事件 (tests/unit/test_product.py)
def test_records_out_of_stock_event_if_cannot_allocate():
    batch = Batch("batch1", "SMALL-FORK", 10, eta=today)
    product = Product(sku="SMALL-FORK", batches=[batch])
    product.allocate(OrderLine("order1", "SMALL-FORK", 10))

    allocation = product.allocate(OrderLine("order2", "SMALL-FORK", 1))
    assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK")  #(1)
    assert allocation is None
  1. 我们的聚合将公开一个名为 .events 的新属性,它将包含一个关于已发生事件的事实列表,以 Event 对象的形式。

这是模型内部的样子

模型引发领域事件 (src/allocation/domain/model.py)
class Product:
    def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
        self.sku = sku
        self.batches = batches
        self.version_number = version_number
        self.events = []  # type: List[events.Event]  #(1)

    def allocate(self, line: OrderLine) -> str:
        try:
            #...
        except StopIteration:
            self.events.append(events.OutOfStock(line.sku))  #(2)
            # raise OutOfStock(f"Out of stock for sku {line.sku}")  #(3)
            return None
  1. 这是我们新的 .events 属性的使用。

  2. 我们不是直接调用一些发送电子邮件的代码,而是在事件发生的地方记录这些事件,仅使用领域语言。

  3. 我们还将停止为缺货情况引发异常。 事件将完成异常正在做的工作。

注意
我们实际上正在解决我们到目前为止存在的代码异味,即我们使用异常进行控制流。 一般来说,如果您正在实现领域事件,请不要引发异常来描述相同的领域概念。 正如您稍后在工作单元模式中处理事件时看到的那样,必须一起推理事件和异常会让人感到困惑。

消息总线将事件映射到处理程序

消息总线基本上说,“当我看到此事件时,我应该调用以下处理程序函数。” 换句话说,它是一个简单的发布-订阅系统。 处理程序订阅接收事件,我们将其发布到总线。 这听起来比实际情况要难,我们通常使用字典来实现它

简单的消息总线 (src/allocation/service_layer/messagebus.py)
def handle(event: events.Event):
    for handler in HANDLERS[type(event)]:
        handler(event)


def send_out_of_stock_notification(event: events.OutOfStock):
    email.send_mail(
        "stock@made.com",
        f"Out of stock for {event.sku}",
    )


HANDLERS = {
    events.OutOfStock: [send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]
注意
请注意,已实现的消息总线不会给我们带来并发性,因为一次只会运行一个处理程序。 我们的目标不是支持并行线程,而是从概念上分离任务,并使每个 UoW 尽可能小。 这有助于我们理解代码库,因为如何运行每个用例的“配方”都写在一个地方。 请参阅以下侧边栏。
这像 Celery 吗?

Celery 是 Python 世界中用于将自包含的工作块推迟到异步任务队列的热门工具。 我们在此处介绍的消息总线非常不同,因此对上述问题的简短回答是否定的; 我们的消息总线与 Express.js 应用程序、UI 事件循环或 Actor 框架更相似。

如果您确实有将工作移出主线程的要求,您仍然可以使用我们基于事件的隐喻,但我们建议您为此使用外部事件。 在 [chapter_11_external_events_tradeoffs] 中有更多讨论,但本质上,如果您实现一种将事件持久化到集中式存储的方法,您可以让其他容器或其他微服务订阅它们。 然后,在单个进程/服务中,使用事件跨工作单元分离职责的相同概念可以扩展到多个进程——​这些进程可能是同一服务中的不同容器,或完全不同的微服务。

如果您遵循我们的方法,您的任务分发 API 就是您的事件——或它们的 JSON 表示形式。 这使您可以非常灵活地将任务分发给谁; 它们不一定是 Python 服务。 Celery 用于分发任务的 API 本质上是“函数名称加上参数”,这更具限制性,并且仅限于 Python。

选项 1:服务层从模型中获取事件并将它们放到消息总线上

我们的领域模型引发事件,我们的消息总线将在事件发生时调用正确的处理程序。 现在我们只需要连接两者。 我们需要一些东西来捕获模型中的事件并将它们传递给消息总线——​发布步骤。

最简单的方法是在我们的服务层中添加一些代码

具有显式消息总线的服务层 (src/allocation/service_layer/services.py)
from . import messagebus
...

def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        try:  #(1)
            batchref = product.allocate(line)
            uow.commit()
            return batchref
        finally:  #(1)
            messagebus.handle(product.events)  #(2)
  1. 我们保留了来自我们早期丑陋实现的 try/finally(我们还没有摆脱所有异常,只是 OutOfStock)。

  2. 但现在,服务层不再直接依赖电子邮件基础设施,而只是负责将事件从模型传递到消息总线。

这已经避免了我们在幼稚实现中遇到的一些丑陋之处,并且我们有几个像这样的系统在工作,其中服务层显式地从聚合收集事件并将它们传递到消息总线。

选项 2:服务层引发自己的事件

我们使用过的另一种变体是让服务层负责直接创建和引发事件,而不是让领域模型引发事件

服务层直接调用 messagebus.handle (src/allocation/service_layer/services.py)
  1. 和以前一样,即使我们分配失败,我们也会提交,因为这种方式代码更简单,更容易推理:除非出现问题,否则我们总是提交。 当我们没有更改任何内容时提交是安全的,并且可以保持代码整洁。

同样,我们有应用程序在生产中以这种方式实现该模式。 什么对您有效将取决于您面临的特定权衡,但我们想向您展示我们认为最优雅的解决方案,即我们将工作单元负责收集和引发事件。

选项 3:UoW 将事件发布到消息总线

UoW 已经有一个 try/finally,并且它知道当前正在使用的所有聚合,因为它提供了对仓库的访问。 因此,它是发现事件并将它们传递到消息总线的好地方

UoW 遇到消息总线 (src/allocation/service_layer/unit_of_work.py)
class AbstractUnitOfWork(abc.ABC):
    ...

    def commit(self):
        self._commit()  #(1)
        self.publish_events()  #(2)

    def publish_events(self):  #(2)
        for product in self.products.seen:  #(3)
            while product.events:
                event = product.events.pop(0)
                messagebus.handle(event)

    @abc.abstractmethod
    def _commit(self):
        raise NotImplementedError

...

class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    ...

    def _commit(self):  #(1)
        self.session.commit()
  1. 我们将更改我们的 commit 方法,使其需要子类的私有 ._commit() 方法。

  2. 提交后,我们遍历仓库已看到的所有对象,并将它们的事件传递给消息总线。

  3. 这依赖于仓库使用一个名为 .seen 的新属性来跟踪已加载的聚合,您将在下一个列表中看到。

注意
您是否想知道如果其中一个处理程序失败会发生什么? 我们将在 [chapter_10_commands] 中详细讨论错误处理。
仓库跟踪通过它的聚合 (src/allocation/adapters/repository.py)
class AbstractRepository(abc.ABC):
    def __init__(self):
        self.seen = set()  # type: Set[model.Product]  #(1)

    def add(self, product: model.Product):  #(2)
        self._add(product)
        self.seen.add(product)

    def get(self, sku) -> model.Product:  #(3)
        product = self._get(sku)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):  #(2)
        raise NotImplementedError

    @abc.abstractmethod  #(3)
    def _get(self, sku) -> model.Product:
        raise NotImplementedError


class SqlAlchemyRepository(AbstractRepository):
    def __init__(self, session):
        super().__init__()
        self.session = session

    def _add(self, product):  #(2)
        self.session.add(product)

    def _get(self, sku):  #(3)
        return self.session.query(model.Product).filter_by(sku=sku).first()
  1. 为了使 UoW 能够发布新事件,它需要能够向仓库询问在此会话期间使用了哪些 Product 对象。 我们使用一个名为 .seenset 来存储它们。 这意味着我们的实现需要调用 super().__init__()

  2. add() 方法将事物添加到 .seen,现在要求子类实现 ._add()

  3. 类似地,.get() 委托给一个 ._get() 函数,该函数由子类实现,以便捕获看到的对象。

注意
使用 ._underscorey() 方法和子类绝对不是您可以实现这些模式的唯一方法。 尝试本章的“读者练习”并尝试一些替代方案。

在 UoW 和仓库以这种方式协作以自动跟踪活动对象并处理其事件之后,服务层可以完全摆脱事件处理的关注

服务层再次变得干净 (src/allocation/service_layer/services.py)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        product = uow.products.get(sku=line.sku)
        if product is None:
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = product.allocate(line)
        uow.commit()
        return batchref

我们确实还必须记住更改服务层中的伪造对象,并在正确的位置使它们调用 super(),并实现 underscorey 方法,但更改是最小的

服务层伪造对象需要调整 (tests/unit/test_services.py)
class FakeRepository(repository.AbstractRepository):
    def __init__(self, products):
        super().__init__()
        self._products = set(products)

    def _add(self, product):
        self._products.add(product)

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

...

class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
    ...

    def _commit(self):
        self.committed = True
读者练习

您是否觉得所有这些 ._add()._commit() 方法都“超级恶心”,用我们敬爱的技术评论员 Hynek 的话说? 它是否“让您想用毛绒蛇猛击 Harry 的头部”? 嘿,我们的代码清单仅旨在作为示例,而不是完美的解决方案! 为什么不去看看您是否可以做得更好?

一种组合优于继承的方式是实现一个包装器类

包装器添加功能然后委托 (src/adapters/repository.py)
  1. 通过包装仓库,我们可以调用实际的 .add().get() 方法,避免了奇怪的 underscorey 方法。

看看您是否可以将类似的模式应用于我们的 UoW 类,以便也摆脱那些 Java 风格的 _commit() 方法。 您可以在 GitHub 上找到代码。

将所有 ABC 切换到 typing.Protocol 是迫使自己避免使用继承的好方法。 如果您想出了什么好东西,请告诉我们!

您可能开始担心维护这些伪造对象将成为维护负担。 毫无疑问,这是一项工作,但根据我们的经验,工作量不大。 一旦您的项目启动并运行,您的仓库和 UoW 抽象的接口实际上就不会发生太大变化。 而且,如果您正在使用 ABC,它们将帮助您在事情不同步时提醒您。

总结

领域事件为我们提供了一种处理系统中工作流程的方式。 我们经常发现,在听取我们的领域专家时,他们以因果或时间方式表达需求——例如,“当我们尝试分配库存但没有可用库存时,我们应该向采购团队发送电子邮件。”

“当 X,然后 Y”这些神奇的词语通常会告诉我们一个事件,我们可以使该事件在我们的系统中具体化。 将事件视为我们模型中的一等公民有助于我们使代码更易于测试和观察,并且有助于隔离关注点。

并且 领域事件:权衡 显示了我们看到的权衡。

表 1. 领域事件:权衡
优点 缺点
  • 当我们需要响应请求采取多个操作时,消息总线为我们提供了一种很好的方式来分离职责。

  • 事件处理程序与“核心”应用程序逻辑很好地解耦,从而可以轻松地在以后更改它们的实现。

  • 领域事件是模拟现实世界的好方法,当我们与利益相关者建模时,我们可以将它们用作我们业务语言的一部分。

  • 消息总线是另一个需要您理解的东西; 工作单元为我们引发事件的实现是简洁的,但也具有魔力。 当我们调用 commit 时,我们并不清楚我们也将要去向人们发送电子邮件。

  • 更重要的是,隐藏的事件处理代码同步执行,这意味着您的服务层函数在任何事件的所有处理程序完成之前都不会完成。 这可能会在您的 Web 端点中引起意外的性能问题(添加异步处理是可能的,但会使事情更加令人困惑)。

  • 更一般而言,事件驱动的工作流程可能会令人困惑,因为在事情分散到多个处理程序的链中之后,系统中没有一个地方可以让您了解请求将如何完成。

  • 您还会让自己面临事件处理程序之间循环依赖和无限循环的可能性。

但是,事件的用途不仅限于发送电子邮件。 在 [chapter_07_aggregate] 中,我们花了很多时间来说服您应该定义聚合,或者我们保证一致性的边界。 人们经常问,“如果我需要在请求中更改多个聚合,我应该怎么做?” 现在我们有了回答这个问题的工具。

如果我们有两个可以事务隔离的东西(例如,订单和产品),那么我们可以通过使用事件使它们最终一致。 当订单被取消时,我们应该找到分配给它的产品并删除分配

领域事件和消息总线回顾

事件可以帮助实现单一职责原则

当我们将多个关注点混合在一个地方时,代码会变得混乱。 事件可以帮助我们通过将主要用例与次要用例分开来保持事物整洁。 我们还使用事件在聚合之间进行通信,以便我们不需要运行锁定多个表的长事务。

消息总线将消息路由到处理程序

您可以将消息总线视为从事件到其消费者的映射字典。 它对事件的含义“一无所知”; 它只是用于在系统中传递消息的简单基础设施。

选项 1:服务层引发事件并将它们传递给消息总线

在您的系统中开始使用事件的最简单方法是在提交工作单元后通过调用 bus.handle(some_new_event) 从处理程序中引发它们。

选项 2:领域模型引发事件,服务层将它们传递给消息总线

关于何时引发事件的逻辑确实应该与模型一起存在,因此我们可以通过从领域模型中引发事件来改进我们系统的设计和可测试性。 我们的处理程序很容易在 commit 后从模型对象中收集事件并将它们传递给总线。

选项 3:UoW 从聚合收集事件并将它们传递给消息总线

bus.handle(aggregate.events) 添加到每个处理程序都很烦人,因此我们可以通过使我们的工作单元负责引发已加载对象引发的事件来整理代码。 这是最复杂的设计,可能依赖于 ORM 魔术,但一旦设置好,它就干净且易于使用。

[chapter_09_all_messagebus] 中,当我们使用新的消息总线构建更复杂的工作流程时,我们将更详细地研究这个想法。


1. 此原则是 SOLID 中的 S
2. 我们的技术评论员 Ed Jung 喜欢说,当您从命令式流程控制更改为基于事件的流程控制时,您正在将编排更改为舞蹈编排