buy the book ribbon

6:工作单元模式

在本章中,我们将介绍最后一块拼图,它将仓库模式和服务层模式结合在一起:工作单元模式。

如果仓库模式是我们对持久化存储概念的抽象,那么工作单元 (UoW) 模式就是我们对原子操作概念的抽象。它将使我们最终且完全地将服务层与数据层解耦。

没有 UoW:API 直接与三层对话 表明,目前,我们的基础设施层之间发生了大量的通信:API 直接与数据库层对话以启动会话,它与仓库层对话以初始化 SQLAlchemyRepository,并且它与服务层对话以请求分配。

提示

本章的代码位于 chapter_06_uow 分支 GitHub

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_06_uow
# or to code along, checkout Chapter 4:
git checkout chapter_04_service_layer
apwp 0601
图 1. 没有 UoW:API 直接与三层对话

使用 UoW:UoW 现在管理数据库状态 显示了我们的目标状态。Flask API 现在只做两件事:它初始化一个工作单元,并调用一个服务。该服务与 UoW 协作(我们喜欢将 UoW 视为服务层的一部分),但服务函数本身和 Flask 都不再需要直接与数据库对话。

我们将使用一段可爱的 Python 语法来完成这一切,一个上下文管理器。

apwp 0602
图 2. 使用 UoW:UoW 现在管理数据库状态

工作单元与仓库协作

让我们看看工作单元(或 UoW,我们发音为 "you-wow")是如何运作的。以下是完成后的服务层的外观

工作单元在操作中的预览 (src/allocation/service_layer/services.py)
def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:  #(1)
        batches = uow.batches.list()  #(2)
        ...
        batchref = model.allocate(line, batches)
        uow.commit()  #(3)
  1. 我们将启动一个 UoW 作为上下文管理器。

  2. uow.batches 是批次仓库,因此 UoW 为我们提供了对永久存储的访问。

  3. 当我们完成时,我们使用 UoW 提交或回滚我们的工作。

UoW 充当我们持久化存储的单个入口点,并且它跟踪已加载的对象和最新状态。[1]

这为我们提供了三个有用的东西

  • 数据库的稳定快照,以便我们使用的对象在操作过程中不会发生变化

  • 一种一次性持久化我们所有更改的方法,这样如果出现问题,我们不会最终处于不一致的状态

  • 一个简单的 API 来处理我们的持久化问题,以及一个方便获取仓库的地方

使用集成测试测试驱动 UoW

以下是我们针对 UOW 的集成测试

UoW 的基本“往返”测试 (tests/integration/test_uow.py)
def test_uow_can_retrieve_a_batch_and_allocate_to_it(session_factory):
    session = session_factory()
    insert_batch(session, "batch1", "HIPSTER-WORKBENCH", 100, None)
    session.commit()

    uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)  #(1)
    with uow:
        batch = uow.batches.get(reference="batch1")  #(2)
        line = model.OrderLine("o1", "HIPSTER-WORKBENCH", 10)
        batch.allocate(line)
        uow.commit()  #(3)

    batchref = get_allocated_batch_ref(session, "o1", "HIPSTER-WORKBENCH")
    assert batchref == "batch1"
  1. 我们通过使用自定义会话工厂初始化 UoW,并返回一个 uow 对象以在我们的 with 代码块中使用。

  2. UoW 通过 uow.batches 为我们提供对批次仓库的访问。

  3. 当我们完成时,我们调用它的 commit()

对于好奇的人,insert_batchget_allocated_batch_ref 助手看起来像这样

用于执行 SQL 操作的助手 (tests/integration/test_uow.py)
def insert_batch(session, ref, sku, qty, eta):
    session.execute(
        "INSERT INTO batches (reference, sku, _purchased_quantity, eta)"
        " VALUES (:ref, :sku, :qty, :eta)",
        dict(ref=ref, sku=sku, qty=qty, eta=eta),
    )


def get_allocated_batch_ref(session, orderid, sku):
    [[orderlineid]] = session.execute(  #(1)
        "SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku",
        dict(orderid=orderid, sku=sku),
    )
    [[batchref]] = session.execute(  #(1)
        "SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id"
        " WHERE orderline_id=:orderlineid",
        dict(orderlineid=orderlineid),
    )
    return batchref
  1. = 语法有点过于聪明,抱歉。正在发生的事情是 session.execute 返回一个行列表,其中每一行都是一个列值元组;在我们的特定情况下,它是一个包含一行的列表,该行是一个包含一个列值的元组。左侧的双中括号正在执行(双重)赋值解包,以从这两个嵌套序列中取回单个值。一旦你使用过几次,它就会变得可读!

工作单元及其上下文管理器

在我们的测试中,我们隐式地定义了 UoW 需要执行的操作的接口。让我们通过使用抽象基类来显式地定义它

抽象 UoW 上下文管理器 (src/allocation/service_layer/unit_of_work.py)
  1. UoW 提供一个名为 .batches 的属性,它将为我们提供对批次仓库的访问。

  2. 如果您从未见过上下文管理器,__enter____exit__ 是两个魔术方法,它们分别在我们进入 with 代码块和退出代码块时执行。它们是我们的设置和拆卸阶段。

  3. 当我们准备好时,我们将调用此方法来显式提交我们的工作。

  4. 如果我们不提交,或者如果我们通过引发错误退出上下文管理器,我们将执行 rollback。(如果已调用 commit(),则回滚无效。请继续阅读以了解更多讨论。)

真实的工作单元使用 SQLAlchemy 会话

我们的具体实现添加的主要内容是数据库会话

真实的 SQLAlchemy UoW (src/allocation/service_layer/unit_of_work.py)
DEFAULT_SESSION_FACTORY = sessionmaker(  #(1)
    bind=create_engine(
        config.get_postgres_uri(),
    )
)


class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
        self.session_factory = session_factory  #(1)

    def __enter__(self):
        self.session = self.session_factory()  # type: Session  #(2)
        self.batches = repository.SqlAlchemyRepository(self.session)  #(2)
        return super().__enter__()

    def __exit__(self, *args):
        super().__exit__(*args)
        self.session.close()  #(3)

    def commit(self):  #(4)
        self.session.commit()

    def rollback(self):  #(4)
        self.session.rollback()
  1. 该模块定义了一个默认会话工厂,它将连接到 Postgres,但我们允许在集成测试中覆盖它,以便我们可以改用 SQLite。

  2. __enter__ 方法负责启动数据库会话并实例化可以使用该会话的真实仓库。

  3. 我们在退出时关闭会话。

  4. 最后,我们提供了具体的 commit()rollback() 方法,它们使用我们的数据库会话。

用于测试的伪造工作单元

以下是我们在服务层测试中如何使用伪造 UoW

伪造 UoW (tests/unit/test_services.py)
class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
    def __init__(self):
        self.batches = FakeRepository([])  #(1)
        self.committed = False  #(2)

    def commit(self):
        self.committed = True  #(2)

    def rollback(self):
        pass


def test_add_batch():
    uow = FakeUnitOfWork()  #(3)
    services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow)  #(3)
    assert uow.batches.get("b1") is not None
    assert uow.committed


def test_allocate_returns_allocation():
    uow = FakeUnitOfWork()  #(3)
    services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow)  #(3)
    result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow)  #(3)
    assert result == "batch1"
...
  1. FakeUnitOfWorkFakeRepository 紧密耦合,就像真实的 UnitofWorkRepository 类一样。这很好,因为我们认识到这些对象是协作者。

  2. 请注意与 FakeSession 中的伪造 commit() 函数的相似之处(我们现在可以摆脱它了)。但这有很大的改进,因为我们现在伪造的是我们自己编写的代码,而不是第三方代码。有人说,“不要模拟你不拥有的东西”

  3. 在我们的测试中,我们可以实例化一个 UoW 并将其传递给我们的服务层,而不是传递仓库和会话。这要方便得多。

不要模拟你不拥有的东西

为什么我们对模拟 UoW 比模拟会话更自在?我们的两个伪造都实现了相同的目标:它们为我们提供了一种替换持久化层的方法,以便我们可以在内存中运行测试,而无需与真实的数据库对话。区别在于最终的设计。

如果我们只关心编写快速运行的测试,我们可以创建替换 SQLAlchemy 的模拟,并在我们的整个代码库中使用它们。问题在于 Session 是一个复杂的对象,它公开了许多与持久化相关的功能。很容易使用 Session 对数据库进行任意查询,但这很快会导致数据访问代码散布在整个代码库中。为了避免这种情况,我们希望限制对持久化层的访问,以便每个组件都拥有其精确所需的内容,而不多余。

通过耦合到 Session 接口,您选择耦合到 SQLAlchemy 的所有复杂性。相反,我们想选择一个更简单的抽象,并使用它来清晰地分离职责。我们的 UoW 比会话简单得多,并且我们对服务层能够启动和停止工作单元感到自在。

“不要模拟你不拥有的东西”是一条经验法则,它迫使我们在混乱的子系统之上构建这些简单的抽象。这与模拟 SQLAlchemy 会话具有相同的性能优势,但鼓励我们仔细思考我们的设计。

在服务层中使用 UoW

以下是我们的新服务层的外观

使用 UoW 的服务层 (src/allocation/service_layer/services.py)
def add_batch(
    ref: str, sku: str, qty: int, eta: Optional[date],
    uow: unit_of_work.AbstractUnitOfWork,  #(1)
):
    with uow:
        uow.batches.add(model.Batch(ref, sku, qty, eta))
        uow.commit()


def allocate(
    orderid: str, sku: str, qty: int,
    uow: unit_of_work.AbstractUnitOfWork,  #(1)
) -> str:
    line = OrderLine(orderid, sku, qty)
    with uow:
        batches = uow.batches.list()
        if not is_valid_sku(line.sku, batches):
            raise InvalidSku(f"Invalid sku {line.sku}")
        batchref = model.allocate(line, batches)
        uow.commit()
    return batchref
  1. 我们的服务层现在只有一个依赖项,再次依赖于抽象 UoW。

提交/回滚行为的显式测试

为了使我们自己相信提交/回滚行为有效,我们编写了几个测试

回滚行为的集成测试 (tests/integration/test_uow.py)
def test_rolls_back_uncommitted_work_by_default(session_factory):
    uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
    with uow:
        insert_batch(uow.session, "batch1", "MEDIUM-PLINTH", 100, None)

    new_session = session_factory()
    rows = list(new_session.execute('SELECT * FROM "batches"'))
    assert rows == []


def test_rolls_back_on_error(session_factory):
    class MyException(Exception):
        pass

    uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
    with pytest.raises(MyException):
        with uow:
            insert_batch(uow.session, "batch1", "LARGE-FORK", 100, None)
            raise MyException()

    new_session = session_factory()
    rows = list(new_session.execute('SELECT * FROM "batches"'))
    assert rows == []
提示
我们没有在此处展示它,但值得针对“真实”数据库(即相同的引擎)测试一些更“晦涩”的数据库行为,例如事务。目前,我们正在使用 SQLite 而不是 Postgres,但在 [chapter_07_aggregate] 中,我们将把一些测试切换为使用真实数据库。我们的 UoW 类使其变得容易,这真是太方便了!

显式与隐式提交

现在我们简要地讨论一下实现 UoW 模式的不同方法。

我们可以想象一个略有不同的 UoW 版本,它默认提交,并且仅在发现异常时才回滚

具有隐式提交的 UoW…​ (src/allocation/unit_of_work.py)
  1. 我们应该在 happy path 中使用隐式提交吗?

  2. 并且仅在异常时回滚?

这将使我们能够节省一行代码,并从我们的客户端代码中删除显式提交

...将为我们节省一行代码 (src/allocation/service_layer/services.py)

这是一个判断性调用,但我们倾向于更喜欢要求显式提交,以便我们必须选择何时刷新状态。

尽管我们使用了一行额外的代码,但这使得软件默认是安全的。默认行为是不更改任何内容。反过来,这使我们的代码更容易推理,因为只有一条代码路径会导致系统中的更改:完全成功和显式提交。任何其他代码路径、任何异常、从 UoW 范围的任何提前退出都会导致安全状态。

同样,我们更喜欢默认回滚,因为它更容易理解;这将回滚到上次提交,因此要么用户执行了一次提交,要么我们清除了他们的更改。苛刻但简单。

示例:使用 UoW 将多个操作分组到原子单元中

以下是一些示例,展示了工作单元模式的用法。您可以看到它如何导致对哪些代码块一起发生进行简单的推理。

示例 1:重新分配

假设我们希望能够取消分配然后重新分配订单

重新分配服务功能
  1. 如果 deallocate() 失败,我们显然不想调用 allocate()

  2. 如果 allocate() 失败,我们可能也不想实际提交 deallocate()

示例 2:更改批次数量

我们的航运公司打电话给我们说,其中一个集装箱门打开了,我们一半的沙发掉进了印度洋。糟糕!

更改数量
  1. 在这里,我们可能需要取消分配任意数量的行。如果我们在任何阶段遇到故障,我们可能希望不提交任何更改。

整理集成测试

我们现在有三组测试,所有测试本质上都指向数据库:test_orm.pytest_repository.pytest_uow.py。我们应该扔掉哪些?

└── tests
    ├── conftest.py
    ├── e2e
    │   └── test_api.py
    ├── integration
    │   ├── test_orm.py
    │   ├── test_repository.py
    │   └── test_uow.py
    ├── pytest.ini
    └── unit
        ├── test_allocate.py
        ├── test_batches.py
        └── test_services.py

如果您认为测试从长远来看不会增加价值,您应该始终可以随意扔掉测试。我们会说 test_orm.py 主要是一个帮助我们学习 SQLAlchemy 的工具,因此从长远来看我们不需要它,特别是如果它正在做的主要事情在 test_repository.py 中得到了涵盖。最后一个测试,您可能会保留,但我们当然可以找到一个论点来证明将所有内容都保持在尽可能高的抽象级别(就像我们在单元测试中所做的那样)。

读者练习

对于本章,可能最好的尝试是从头开始实现 UoW。代码一如既往地 在 GitHub 上。您可以完全按照我们拥有的模型进行操作,或者可以尝试将 UoW(其职责是 commit()rollback() 和提供 .batches 仓库)与上下文管理器分开,上下文管理器的工作是初始化事物,然后在退出时执行提交或回滚。如果您想进行全函数式编程而不是摆弄所有这些类,则可以使用 contextlib 中的 @contextmanager

我们已经剥离了实际的 UoW 和伪造的 UoW,以及精简了抽象的 UoW。如果您想出了特别值得骄傲的东西,为什么不给我们发送一个指向您的仓库的链接呢?

提示
这是 [chapter_05_high_gear_low_gear] 中的课程的另一个例子:当我们构建更好的抽象时,我们可以将我们的测试转移到针对它们运行,这使我们可以自由地更改底层的细节。

总结

希望我们已经说服您,工作单元模式很有用,并且上下文管理器是一种非常好的 Pythonic 方式,可以直观地将代码分组到我们希望原子性发生的块中。

事实上,这种模式非常有用,以至于 SQLAlchemy 已经以 Session 对象的形式使用了 UoW。SQLAlchemy 中的 Session 对象是您的应用程序从数据库加载数据的方式。

每次您从数据库加载新实体时,会话都会开始跟踪对实体的更改,并且当会话刷新时,您的所有更改都会一起持久化。如果 SQLAlchemy 会话已经实现了我们想要的模式,为什么我们还要费力地抽象掉 SQLAlchemy 会话呢?

工作单元模式:权衡 讨论了一些权衡。

表 1. 工作单元模式:权衡
优点 缺点
  • 我们对原子操作的概念进行了很好的抽象,并且上下文管理器使我们能够轻松地直观地看到哪些代码块原子地组合在一起。

  • 我们可以显式控制事务何时开始和结束,并且我们的应用程序以默认安全的方式失败。我们永远不必担心操作会部分提交。

  • 这是一个放置所有仓库的好地方,以便客户端代码可以访问它们。

  • 正如您将在后面的章节中看到的那样,原子性不仅与事务有关;它还可以帮助我们处理事件和消息总线。

  • 您的 ORM 可能已经围绕原子性进行了一些非常好的抽象。SQLAlchemy 甚至有上下文管理器。您只需传递会话就可以走很长的路。

  • 我们使其看起来很容易,但您必须非常仔细地考虑诸如回滚、多线程和嵌套事务之类的事情。也许坚持使用 Django 或 Flask-SQLAlchemy 为您提供的功能将使您的生活更轻松。

首先,Session API 功能丰富,并支持我们不希望或不需要在我们的领域中进行的操作。我们的 UnitOfWork 将会话简化为它的基本核心:它可以启动、提交或丢弃。

另一方面,我们正在使用 UnitOfWork 来访问我们的 Repository 对象。这是一个简洁的开发人员可用性,我们无法使用普通的 SQLAlchemy Session 来实现。

工作单元模式回顾

工作单元模式是围绕数据完整性的抽象

它通过让我们在操作结束时执行单个刷新操作,帮助强制执行我们领域模型的一致性并提高性能。

它与仓库模式和服务层模式紧密合作

工作单元模式通过表示原子更新来完成我们对数据访问的抽象。我们的每个服务层用例都在单个工作单元中运行,该工作单元作为一个整体成功或失败。

这是一个上下文管理器的绝佳用例

上下文管理器是在 Python 中定义范围的惯用方式。我们可以使用上下文管理器在请求结束时自动回滚我们的工作,这意味着系统默认是安全的。

SQLAlchemy 已经实现了这种模式

我们引入了 SQLAlchemy Session 对象之上的更简单的抽象,以便“缩小”ORM 和我们的代码之间的接口。这有助于保持我们的松耦合。

最后,我们再次受到依赖倒置原则的驱动:我们的服务层依赖于一个薄抽象,我们在系统的外部边缘附加一个具体的实现。这与 SQLAlchemy 自己的 建议 非常吻合

保持会话(通常是事务)的生命周期分离和外部化。对于更重要的应用程序,最全面的方法(推荐)将尝试将会话、事务和异常管理的细节尽可能地远离执行其工作的程序的细节。

— SQLALchemy "会话基础" 文档

1. 您可能遇到过使用 协作者 这个词来描述共同工作以实现目标的对象。工作单元和仓库是对象建模意义上协作者的一个很好的例子。在职责驱动设计中,在其角色中协作的对象集群称为对象邻域,在我们专业的意见中,这完全是可爱的。