仓库模式和工作单元模式

作者:Bob,2017-09-08

在本系列的前一部分(介绍命令处理器)中,我们构建了一个玩具系统,它可以向 IssueLog 添加新的 Issue,但它本身没有真正的行为,并且每次应用程序重启时都会丢失其数据。我们将通过引入一些用于持久数据访问的模式来稍微扩展它,并更多地讨论端口和适配器架构背后的思想。为了回顾,我们遵守三个原则

  1. 清晰地定义我们用例的边界。
  2. 依赖于抽象,而不是具体的实现。
  3. 将粘合代码识别为与领域逻辑不同的代码,并将其放在自己的层中。

在我们的命令处理器中,我们编写了以下代码

reporter = IssueReporter(cmd.reporter_name, cmd.reporter_email)
issue = Issue(reporter, cmd.problem_description)
issue_log.add(issue)

IssueLog 是我们与领域专家对话中的一个术语。它是他们记录所有 issue 列表的地方。这是我们客户使用的术语的一部分,因此它显然属于领域,但它也是数据存储的理想抽象。我们如何修改代码,以便我们新创建的 Issue 将被持久化?我们不希望我们的 IssueLog 依赖于数据库,因为这违反了原则 #2。这个问题将我们引向端口和适配器架构。

在端口和适配器架构中,我们构建一个纯粹的领域,该领域公开端口。端口是数据进入或离开领域模型的方式。在这个系统中,IssueLog 是一个端口。端口通过适配器连接到外部世界。在之前的代码示例中,FakeIssueLog 是一个适配器:它通过实现接口为系统提供服务。

让我们用一个现实世界的类比。想象一下,我们有一个电路,它可以检测超过某个阈值的电流。如果达到阈值,电路会输出一个信号。在我们的电路中,我们连接了两个端口,一个用于电流输入,一个用于电流输出。输入和输出通道是我们电路的一部分:没有它们,电路就没用了。

class ThresholdDetectionCircuit:

    arbitrary_threshold = 4

    def __init__(self, input: ReadablePort, output: WriteablePort):
        self.input = input
        self.output = output

    def read_from_input(self):
        next_value = self.input.read()
        if next_value > self.arbitrary_threshold:
            self.output.write(1)

因为我们有远见地使用了标准化端口,所以我们可以将任意数量的不同设备插入到我们的电路中。例如,我们可以将光探测器连接到输入端,将蜂鸣器连接到输出端,或者我们可以将拨号盘连接到输入端,将灯连接到输出端,等等。

class LightDetector(ReadablePort):
    def read(self):
        return self.get_light_amplitude()

class Buzzer(WriteablePort):
    def write(self, value):
        if value > 0:
            self.make_infuriating_noise()


class Dial(ReadablePort):
    def read(self):
        return self.current_value

class Light(self):
    def write(self, value):
        if value > 0:
            self.on = True
        else:
            self.on = False

孤立地考虑,这只是一个良好的 OO 实践的例子:我们通过组合来扩展我们的系统。使这成为端口和适配器架构的是这样一种思想,即存在一个由领域模型(我们的 ThresholdDetectionCircuit)组成的内部世界,以及一个通过良好定义的端口驱动领域模型的外部世界。所有这一切与数据库有什么关系呢?

from SqlAlchemy import Session

class SqlAlchemyIssueLog (IssueLog):

    def __init__(self, session: Session):
        self.session = session

    def add(self, issue):
        self.session.add(issue)


class TextFileIssueLog (IssueLog):

    def __init__(self, path):
        self.path = path

    def add(self, issue):
        with open(self.path, 'w') as f:
            json.dump(f)

通过类比我们的电路示例,IssueLog 是一个 WriteablePort - 它是我们从系统中获取数据的一种方式。SqlAlchemy 和文件系统是我们可能插入的两种类型的适配器,就像 Buzzer 或 Light 类一样。事实上,IssueLog 是一个常见设计模式的实例:它是一个仓库 [https://martinfowler.com.cn/eaaCatalog/repository.html]。仓库是一个通过向我们展示看起来像集合的接口来隐藏持久存储细节的对象。我们应该能够向仓库添加新事物,并从仓库中取出事物,这基本上就是全部。

让我们看一下一个简单的仓库模式。

class FooRepository:
    def __init__(self, db_session):
        self.session = db_session

    def add_new_item(self, item):
        self.db_session.add(item)

    def get_item(self, id):
        return self.db_session.get(Foo, id)

    def find_foos_by_latitude(self, latitude):
        return self.session.query(Foo).\
                filter(foo.latitude == latitude)

我们公开了一些方法,一个用于添加新项目,一个用于按 id 获取项目,第三个用于按某些标准查找项目。这个 FooRepository 正在使用 SqlAlchemy 会话 [https://docs.sqlalchemy.org.cn/en/latest/orm/session_basics.html] 对象,所以它是我们适配器层的一部分。我们可以定义一个不同的适配器用于单元测试。

class FooRepository:
    def __init__(self, db_session):
        self.items = []

    def add_new_item(self, item):
        self.items.append(item)

    def get_item(self, id):
        return next((item for item in self.items 
                          if item.id == id))

    def find_foos_by_latitude(self, latitude):
        return (item for item in self.items
                     if item.latitude == latitude)

这个适配器的工作方式与由真实数据库支持的适配器完全相同,但这样做没有任何外部状态。这使我们能够在不求助于数据库上的 Setup/Teardown 脚本,或猴子补丁我们的 ORM 以返回硬编码值的情况下测试我们的代码。我们只需将不同的适配器插入到现有的端口中。与 ReadablePort 和 WriteablePort 一样,这个接口的简单性使我们可以轻松地插入不同的实现。

仓库为我们提供了对数据存储中对象的读/写访问权限,并且通常与另一种模式一起使用,即工作单元 [https://martinfowler.com.cn/eaaCatalog/unitOfWork.html]。工作单元代表了一堆必须一起发生的事情。它通常允许我们在请求的生命周期内将对象缓存在内存中,这样我们就不需要重复调用数据库。工作单元负责对我们的对象进行脏检查,并在请求结束时将任何状态更改刷新到数据库。

工作单元是什么样的?

class SqlAlchemyUnitOfWorkManager(UnitOfWorkManager):
    """The Unit of work manager returns a new unit of work. 
       Our UOW is backed by a sql alchemy session whose 
       lifetime can be scoped to a web request, or a 
       long-lived background job."""
    def __init__(self, session_maker):
        self.session_maker = session_maker

    def start(self):
        return SqlAlchemyUnitOfWork(self.session_maker)


class SqlAlchemyUnitOfWork(UnitOfWork):
    """The unit of work captures the idea of a set of things that
       need to happen together. 

       Usually, in a relational database, 
       one unit of work == one database transaction."""

    def __init__(self, sessionfactory):
        self.sessionfactory = sessionfactory

    def __enter__(self):
        self.session = self.sessionfactory()
        return self

    def __exit__(self, type, value, traceback):
        self.session.close()

    def commit(self):
        self.session.commit()

    def rollback(self):
        self.session.rollback()

    # I tend to put my repositories onto my UOW
    # for convenient access. 
    @property
    def issues(self):
        return IssueRepository(self.session)

这段代码取自当前的生产系统 - 实现这些模式的代码实际上并不复杂。这里唯一缺少的是 commit 方法中的一些日志记录和错误处理。我们的工作单元管理器创建一个新的工作单元,或者根据我们配置 SqlAlchemy 的方式为我们提供一个现有的工作单元。工作单元本身只是 SqlAlchemy 之上的一个薄层,它为我们提供了显式的回滚和提交点。让我们重新审视我们的第一个命令处理器,看看我们如何一起使用这些模式。

class ReportIssueHandler:
    def __init__(self, uowm:UnitOfWorkManager):
        self.uowm = uowm

    def handle(self, cmd):
        with self.uowm.start() as unit_of_work:
            reporter = IssueReporter(cmd.reporter_name, cmd.reporter_email)
            issue = Issue(reporter, cmd.problem_description)
            unit_of_work.issues.add(issue)
            unit_of_work.commit()

我们的命令处理器看起来或多或少相同,只是它现在负责启动一个工作单元,并在完成时提交工作单元。这符合我们的规则 #1 - 我们将清楚地定义用例的开始和结束。我们确切地知道这里只有一个对象被加载和修改,并且我们的数据库事务保持简短。我们的处理器依赖于一个抽象 - UnitOfWorkManager,并且不关心它是测试替身还是 SqlAlchemy 会话,所以这涵盖了规则 #2。最后,这段代码非常枯燥,因为它只是粘合代码。我们将所有枯燥的粘合代码移到我们系统的边缘,以便我们可以以我们喜欢的任何方式编写我们的领域模型:规则 #3 已遵守。

这部分的示例代码 [https://github.com/bobthemighty/blog-code-samples/tree/master/ports-and-adapters/02] 添加了几个新包 - 一个用于慢速测试 [http://pycon-2012-notes.readthedocs.io/en/latest/fast_tests_slow_tests.html](通过网络或到真实文件系统的测试),另一个用于我们的适配器。我们还没有添加任何新功能,但是我们添加了一个测试,表明我们可以通过我们的命令处理器和工作单元将 Issue 插入到 sqlite 数据库中。请注意,所有 ORM 代码都在一个模块 (issues.adapters.orm) 中,并且它依赖于我们的领域模型,而不是相反。我们的领域对象不继承自 SqlAlchemy 的声明性基类。我们开始对拥有位于系统“内部”的领域和位于“外部”的基础设施代码有所了解。

我们的单元测试已更新为使用工作单元,现在我们可以测试我们将 issue 插入到我们的 issue 日志中,并提交工作单元,而无需依赖任何实际的实现细节。我们可以从我们的代码库中完全删除 SqlAlchemy,我们的单元测试将继续工作,因为我们有一个纯粹的领域模型,并且我们从我们的服务层公开了抽象端口。

class When_reporting_an_issue:

    def given_an_empty_unit_of_work(self):
        self.uow = FakeUnitOfWork()

    def because_we_report_a_new_issue(self):
        handler = ReportIssueHandler(self.uow)
        cmd = ReportIssueCommand(name, email, desc)

        handler.handle(cmd)

    def the_handler_should_have_created_a_new_issue(self):
        expect(self.uow.issues).to(have_len(1))

    def it_should_have_recorded_the_issuer(self):
        expect(self.uow.issues[0].reporter.name).to(equal(name))
        expect(self.uow.issues[0].reporter.email).to(equal(email))

    def it_should_have_recorded_the_description(self):
        expect(self.uow.issues[0].description).to(equal(desc))

    def it_should_have_committed_the_unit_of_work(self):
        expect(self.uow.was_committed).to(be_true)

下次 [https://io.made.com/blog/commands-and-queries-handlers-and-views] 我们将看看如何从系统中取回数据。