buy the book ribbon

10: 命令和命令处理器

在上一章中,我们讨论了使用事件作为表示系统输入的方式,并将我们的应用程序变成了一个消息处理机器。

为了实现这一点,我们将所有用例函数转换为事件处理程序。当 API 接收到创建新批次的 POST 请求时,它会构建一个新的 BatchCreated 事件并像处理内部事件一样处理它。这可能会感觉违反直觉。毕竟,批次尚未创建;这就是我们调用 API 的原因。我们将通过引入命令并展示如何通过相同的消息总线但使用略有不同的规则来处理它们,从而解决这个概念上的瑕疵。

提示

本章的代码位于 chapter_10_commands 分支 on GitHub

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_10_commands
# or to code along, checkout the previous chapter:
git checkout chapter_09_all_messagebus

命令和事件

与事件一样,命令也是一种消息类型——由系统的一部分发送到另一部分的指令。我们通常使用简单的数据结构来表示命令,并且可以像处理事件一样处理它们。

但是,命令和事件之间的区别很重要。

命令由一个参与者发送给另一个特定参与者,期望特定事情会因此发生。当我们向 API 处理程序提交表单时,我们正在发送一个命令。我们使用祈使语气动词短语来命名命令,例如“分配库存”或“延迟发货”。

命令捕获意图。它们表达了我们希望系统做某事的愿望。因此,当它们失败时,发送者需要接收错误信息。

事件由一个参与者广播给所有感兴趣的监听者。当我们发布 BatchQuantityChanged 时,我们不知道谁会接收它。我们使用过去时动词短语来命名事件,例如“订单已分配到库存”或“发货已延迟”。

我们经常使用事件来传播关于成功命令的知识。

事件捕获关于过去发生的事情的事实。由于我们不知道谁在处理事件,因此发送者不应关心接收者是否成功或失败。事件与命令对比 概括了这些差异。

表 1. 事件与命令对比
事件 命令

命名

过去时

祈使语气

错误处理

独立失败

显式失败

发送到

所有监听者

一个接收者

目前我们的系统中有哪些类型的命令?

提取一些命令 (src/allocation/domain/commands.py)
class Command:
    pass


@dataclass
class Allocate(Command):  #(1)
    orderid: str
    sku: str
    qty: int


@dataclass
class CreateBatch(Command):  #(2)
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None


@dataclass
class ChangeBatchQuantity(Command):  #(3)
    ref: str
    qty: int
  1. commands.Allocate 将替换 events.AllocationRequired

  2. commands.CreateBatch 将替换 events.BatchCreated

  3. commands.ChangeBatchQuantity 将替换 events.BatchQuantityChanged

异常处理的差异

仅仅更改名称和动词当然很好,但这不会改变我们系统的行为。我们希望以类似的方式对待事件和命令,但并非完全相同。让我们看看我们的消息总线如何变化

不同地分发事件和命令 (src/allocation/service_layer/messagebus.py)
Message = Union[commands.Command, events.Event]


def handle(  #(1)
    message: Message,
    uow: unit_of_work.AbstractUnitOfWork,
):
    results = []
    queue = [message]
    while queue:
        message = queue.pop(0)
        if isinstance(message, events.Event):
            handle_event(message, queue, uow)  #(2)
        elif isinstance(message, commands.Command):
            cmd_result = handle_command(message, queue, uow)  #(2)
            results.append(cmd_result)
        else:
            raise Exception(f"{message} was not an Event or Command")
    return results
  1. 它仍然有一个主要的 handle() 入口点,它接受一个 message,这可能是一个命令或一个事件。

  2. 我们将事件和命令分派到两个不同的辅助函数,如下所示。

以下是我们如何处理事件

事件不能中断流程 (src/allocation/service_layer/messagebus.py)
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    for handler in EVENT_HANDLERS[type(event)]:  #(1)
        try:
            logger.debug("handling event %s with handler %s", event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception("Exception handling event %s", event)
            continue  #(2)
  1. 事件被发送到一个分派器,该分派器可以委托给每个事件的多个处理程序。

  2. 它捕获并记录错误,但不让它们中断消息处理。

以下是我们如何处理命令

命令重新引发异常 (src/allocation/service_layer/messagebus.py)
def handle_command(
    command: commands.Command,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    logger.debug("handling command %s", command)
    try:
        handler = COMMAND_HANDLERS[type(command)]  #(1)
        result = handler(command, uow=uow)
        queue.extend(uow.collect_new_events())
        return result  #(3)
    except Exception:
        logger.exception("Exception handling command %s", command)
        raise  #(2)
  1. 命令分派器期望每个命令只有一个处理程序。

  2. 如果引发任何错误,它们将快速失败并向上冒泡。

  3. return result 只是暂时的;正如在 [temporary_ugly_hack] 中提到的,这是一个临时的 hack,允许消息总线返回批次引用供 API 使用。我们将在 [chapter_12_cqrs] 中修复这个问题。

我们还将单个 HANDLERS 字典更改为命令和事件的不同字典。根据我们的约定,命令只能有一个处理程序

新的处理程序字典 (src/allocation/service_layer/messagebus.py)
EVENT_HANDLERS = {
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

COMMAND_HANDLERS = {
    commands.Allocate: handlers.allocate,
    commands.CreateBatch: handlers.add_batch,
    commands.ChangeBatchQuantity: handlers.change_batch_quantity,
}  # type: Dict[Type[commands.Command], Callable]

讨论:事件、命令和错误处理

许多开发人员此时会感到不安并询问:“当事件处理失败时会发生什么?我应该如何确保系统处于一致状态?” 如果我们在 messagebus.handle 期间成功处理了一半的事件,然后内存溢出错误杀死了我们的进程,我们如何减轻丢失消息引起的问题?

让我们从最坏的情况开始:我们未能处理一个事件,系统处于不一致的状态。什么类型的错误会导致这种情况?通常在我们的系统中,当只完成一半操作时,我们最终可能会处于不一致的状态。

例如,我们可以将三个单位的 DESIRABLE_BEANBAG 分配给客户的订单,但不知何故未能减少剩余库存量。这将导致不一致的状态:这三个单位的库存既已分配可用,具体取决于你的看法。稍后,我们可能会将相同的豆袋分配给另一位客户,从而给客户支持带来麻烦。

但在我们的分配服务中,我们已经采取措施来防止这种情况发生。我们已经仔细地识别了充当一致性边界的聚合,并且我们引入了一个 UoW 来管理聚合更新的原子成功或失败。

例如,当我们将库存分配给订单时,我们的一致性边界是 Product 聚合。这意味着我们不会意外地超额分配:要么特定的订单行分配给产品,要么没有——没有不一致状态的空间。

根据定义,我们不要求两个聚合立即保持一致,因此如果我们未能处理事件并且只更新单个聚合,我们的系统仍然可以最终保持一致。我们不应违反系统的任何约束。

考虑到这个例子,我们可以更好地理解将消息拆分为命令和事件的原因。当用户想要让系统做某事时,我们将他们的请求表示为一个命令。该命令应修改单个聚合,并且要么完全成功,要么完全失败。我们需要做的任何其他簿记、清理和通知都可以通过事件发生。我们不要求事件处理程序必须成功才能使命令成功。

让我们看另一个例子(来自一个不同的、虚构的项目),看看为什么不是这样。

假设我们正在构建一个销售昂贵奢侈品的电子商务网站。我们的营销部门希望奖励重复访问的客户。我们将在客户进行第三次购买后将其标记为 VIP 客户,这将使他们有权获得优先待遇和特别优惠。我们对此故事的验收标准如下

使用我们在本书中已经讨论过的技术,我们决定构建一个新的 History 聚合,该聚合记录订单并在规则满足时引发领域事件。我们将像这样构建代码

VIP 客户(不同项目的示例代码)
  1. History 聚合捕获指示客户何时成为 VIP 的规则。这使我们能够很好地应对未来规则变得更加复杂时的变化。

  2. 我们的第一个处理程序为客户创建一个订单并引发领域事件 OrderCreated

  3. 我们的第二个处理程序更新 History 对象以记录订单已创建

  4. 最后,当客户成为 VIP 时,我们向他们发送电子邮件。

使用这段代码,我们可以获得一些关于事件驱动系统中错误处理的直觉。

在我们当前的实现中,我们在将状态持久化到数据库之后引发关于聚合的事件。如果我们持久化之前引发这些事件,并同时提交所有更改会怎样?这样,我们可以确保所有工作都已完成。那样不是更安全吗?

但是,如果电子邮件服务器略微过载会发生什么?如果所有工作都必须同时完成,那么繁忙的电子邮件服务器可能会阻止我们收取订单款项。

如果 History 聚合的实现中存在错误会发生什么?我们是否应该仅仅因为无法识别你为 VIP 而拒绝收取你的款项?

通过分离这些关注点,我们使事物有可能隔离失败,从而提高了系统的整体可靠性。这段代码中必须完成的唯一部分是创建订单的命令处理程序。这是客户唯一关心的部分,也是我们的业务利益相关者应该优先考虑的部分。

请注意,我们如何有意识地将事务边界与业务流程的开始和结束对齐。我们在代码中使用的名称与我们的业务利益相关者使用的术语相匹配,我们编写的处理程序与我们的自然语言验收标准的步骤相匹配。这种名称和结构的一致性有助于我们随着系统的增长和复杂化来推理我们的系统。

同步地从错误中恢复

希望我们已经说服你,事件可以独立于引发它们的命令而失败是没问题的。那么,当错误不可避免地发生时,我们应该做些什么来确保我们可以从错误中恢复?

我们需要的第一件事是知道何时发生了错误,为此我们通常依赖日志。

让我们再次查看消息总线中的 handle_event 方法

当前处理函数 (src/allocation/service_layer/messagebus.py)
def handle_event(
    event: events.Event,
    queue: List[Message],
    uow: unit_of_work.AbstractUnitOfWork,
):
    for handler in EVENT_HANDLERS[type(event)]:
        try:
            logger.debug("handling event %s with handler %s", event, handler)
            handler(event, uow=uow)
            queue.extend(uow.collect_new_events())
        except Exception:
            logger.exception("Exception handling event %s", event)
            continue

当我们在系统中处理消息时,我们做的第一件事是写入日志行以记录我们即将要做的事情。对于我们的 CustomerBecameVIP 用例,日志可能如下所示

Handling event CustomerBecameVIP(customer_id=12345)
with handler <function congratulate_vip_customer at 0x10ebc9a60>

因为我们选择使用数据类作为我们的消息类型,所以我们得到了传入数据的整洁打印摘要,我们可以复制并粘贴到 Python shell 中以重新创建该对象。

当发生错误时,我们可以使用记录的数据在单元测试中重现问题,或者将消息重播到系统中。

手动重播对于我们需要在重新处理事件之前修复错误的情况非常有效,但我们的系统总是会遇到一些背景级别的瞬时故障。这包括网络故障、表死锁以及部署引起的短暂停机时间等。

对于大多数这些情况,我们可以通过重试来优雅地恢复。正如谚语所说,“如果第一次不成功,请以指数级递增的回退周期重试操作。”

使用重试处理 (src/allocation/service_layer/messagebus.py)
  1. Tenacity 是一个 Python 库,它实现了常见的重试模式。

  2. 在这里,我们将消息总线配置为最多重试操作三次,尝试之间等待时间呈指数级增长。

重试可能失败的操作可能是提高软件弹性的最佳方法。同样,工作单元和命令处理程序模式意味着每次尝试都从一致的状态开始,并且不会留下半成品。

警告
在某个时候,无论 tenacity 如何,我们都必须放弃尝试处理消息。构建具有分布式消息的可靠系统很困难,我们不得不略过一些棘手的部分。在尾声中,有更多参考资料的指针。

总结

在本书中,我们决定在命令概念之前介绍事件概念,但其他指南通常会反过来做。通过给请求命名和它们自己的数据结构来明确系统可以响应的请求,这是一件非常基本的事情。您有时会看到人们使用命令处理程序模式这个名称来描述我们正在使用事件、命令和消息总线所做的事情。

拆分命令和事件:权衡 讨论了在您加入之前应该考虑的一些事情。

表 2. 拆分命令和事件:权衡
优点 缺点
  • 不同地对待命令和事件有助于我们理解哪些事情必须成功,哪些事情我们可以稍后整理。

  • CreateBatch 绝对比 BatchCreated 更不容易混淆。我们明确表达了用户的意图,而明确胜于含蓄,对吗?

  • 命令和事件之间的语义差异可能很微妙。预计会就这些差异进行无休止的争论。

  • 我们明确地邀请失败。我们知道有时事情会崩溃,我们选择通过使失败更小和更孤立来处理这个问题。这会使系统更难以推理,并且需要更好的监控。

[chapter_11_external_events] 中,我们将讨论使用事件作为集成模式。