请注意:端口和适配器与命令处理程序系列中的这部分内容代码量很大,如果您没有阅读之前的章节,则很难理解。
好的,我们现在有了一个应用程序的基本框架,可以向数据库添加新的问题,然后从 Flask API 中获取它们。然而,到目前为止,我们没有任何领域逻辑。我们所拥有的只是一堆复杂的废话,而我们本可以只用一个微型的 Django 应用程序来完成。让我们研究更多用例,并开始充实内容。
回到我们的领域专家
那么,当我们向问题日志添加了一个报告的问题后,接下来会发生什么?
嗯,我们需要对问题进行分类,并确定它的紧急程度。然后我们可能会将其分配给特定的工程师,或者我们可能会将其留在队列中供任何人处理。
等等,队列?我以为你有一个问题日志,它们是同一回事吗,还是有区别?
哦,是的。问题日志只是我们收到的所有问题的记录,但我们是从队列中工作的。
我明白了,那么事情是如何进入队列的呢?
我们对问题日志中的新项目进行分类,以确定它们的紧急程度以及它们应该属于哪些类别。当我们知道如何对它们进行分类以及它们的紧急程度时,我们将问题视为一个队列,并按优先级顺序处理它们。
这是因为用户总是将事情设置为“极其紧急”?
是的,我们自己分类问题更容易。
那实际上意味着什么,比如,你只是阅读工单,然后说“哦,这很重要,级别为 5,并且属于鼠标坏了类别”?
嗯…… 差不多,有时我们需要向用户询问更多问题,所以我们会给他们发邮件或打电话。大多数事情都是先到先得,但有时有人需要在开会前进行修复。
所以你给用户发邮件以获取更多信息,或者你给他们打电话,然后你使用这些信息来评估问题的优先级 - 对不起,是分类问题,并确定它应该属于哪个类别…… 类别实现了什么?为什么要分类?
部分是为了报告,这样我们可以看到哪些事情占用了最多的时间,或者例如,在特定批次的笔记本电脑上是否存在类似问题的集群。主要是因为不同的工程师有不同的技能,比如如果你遇到 Active Directory 域的问题,那么你应该把它发送给 Barry,或者如果是 Exchange 问题,那么 George 可以解决,Mike 有设备日志,所以他可以给你一台临时的笔记本电脑等等。
好的,我在哪里可以找到这个“队列”?
你的客户咧嘴一笑,指着墙壁,墙上的一块大白板上贴满了不同颜色的便利贴和贴纸。
将我们的需求映射到我们的领域
我们如何将这些需求映射回我们的系统?回顾我们与领域专家的笔记,有一些明显的动词我们应该用来建模我们的用例。我们可以分类问题,这意味着我们确定其优先级和类别;我们可以将已分类的问题分配给工程师,或者工程师可以从队列中选取未分配的问题。还有一个关于提问的完整部分,我们可能会通过打电话并填写更多详细信息来同步进行,或者通过发送电子邮件来异步进行。队列,及其所有的贴纸、标记和泳道,看起来太复杂了,今天无法处理,所以我们将单独深入研究它。
让我们快速充实分类用例。我们将从更新现有的报告问题的单元测试开始
class When_reporting_an_issue:
def given_an_empty_unit_of_work(self):
self.uow = FakeUnitOfWork()
def because_we_report_a_new_issue(self):
handler = ReportIssueHandler(self.uow)
cmd = ReportIssueCommand(id, name, email, desc)
handler.handle(cmd)
@property
def issue(self):
return self.uow.issues[0]
def it_should_be_awaiting_triage(self):
expect(self.issue.state).to(equal(IssueState.AwaitingTriage))
我们正在引入一个新概念 - 问题现在有状态,新报告的问题开始时处于“等待分类”状态。我们可以快速添加一个命令和处理程序,使我们能够分类问题。
class TriageIssueHandler:
def __init__(self, uowm: UnitOfWorkManager):
self.uowm = uowm
def handle(self, cmd):
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.issue_id)
issue.triage(cmd.priority, cmd.category)
uow.commit()
目前,分类问题就是选择类别和优先级。我们将使用自由字符串作为类别,并使用枚举作为优先级。一旦问题被分类,它就进入“等待分配”状态。在某个时候,我们需要添加一些视图构建器来列出等待分类或分配的问题,但现在让我们快速添加一个处理程序,以便工程师可以从队列中选取一个问题。
class PickIssueHandler:
def __init__(self, uowm: UnitOfWorkManager):
self.uowm = uowm
def handle(self, cmd):
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.issue_id)
issue.assign_to(cmd.picked_by)
uow.commit()
在这一点上,处理程序变得有点无聊了。正如我在第一部分 [https://io.made.com/blog/introducing-command-handler/] 中所说的那样,命令处理程序应该是无聊的粘合代码,并且每个命令处理程序都具有相同的基本结构
- 获取当前状态。
- 通过调用我们领域模型上的方法来改变状态。
- 持久化新状态。
- 通知系统的其他部分我们的状态已更改。
然而,到目前为止,我们只看到了步骤 1、2 和 3。让我们引入一个新的需求。
当问题分配给工程师时,我们可以给他们发送电子邮件通知吗?
关于 SRP 的简短讨论 让我们尝试实现这个新需求。这是第一次尝试
class AssignIssueHandler:
def __init__(self,
uowm: UnitOfWorkManager,
email_builder: EmailBuilder,
email_sender: EmailSender):
self.uowm = uowm
self.email_builder = email_builder
self.email_sender = email_sender
def handle(self, cmd):
# Assign Issue
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.issue_id)
issue.assign_to(
cmd.assigned_to,
assigned_by=cmd.assigned_by
)
uow.commit()
# Send Email
email = self.email_builder.build(
cmd.assigned_to,
cmd.assigned_by,
issue.problem_description)
self.email_sender.send(email)
这里有些东西感觉不对劲,对吧?我们的命令处理程序现在有两个非常不同的职责。在本系列的开头,我们说过我们将坚持三个原则
- 我们将始终定义我们的用例从哪里开始和结束。
- 我们将依赖抽象,而不是具体的实现。
- 我们将把粘合代码与业务逻辑区分开来,并将它放在适当的位置。
后两个原则在这里得到了维护,但第一个原则感觉有点勉强。至少我们违反了单一职责原则 [https://zh.wikipedia.org/wiki/%E5%8D%95%E4%B8%80%E8%81%8C%E8%B4%A3%E5%8E%9F%E5%88%99];我对 SRP 的经验法则是“描述你的类的行为。如果你使用 ‘和’ 或 ‘然后’ 这个词,你可能正在违反 SRP”。这个类做什么?它将问题分配给工程师,然后给他们发送电子邮件。这足以让我的重构感官开始刺痛,但还有另一个不太理论的原因来拆分这个方法,这与错误处理有关。
如果我点击标记为“分配给工程师”的按钮,并且我无法将问题分配给该工程师,那么我希望看到错误。系统无法执行我给它的命令,所以我应该重试,或者选择不同的工程师。
如果我点击标记为“分配给工程师”的按钮,并且系统成功了,但是然后无法发送通知邮件,我会在意吗?我应该采取什么行动来回应?我应该再次分配问题吗?我应该把它分配给其他人吗?如果我这样做,系统将处于什么状态?
以这种方式看待问题,很明显“分配问题”是我们用例的真正边界,我们应该要么成功完成,要么完全失败。“发送电子邮件”是次要的副作用。如果那部分失败了,我不想看到错误 - 让系统管理员稍后清理它。
如果我们把通知拆分到另一个类中会怎么样?
class AssignIssueHandler:
def __init__(self, uowm: UnitOfWorkManager):
self.uowm = uowm
def handle(self, cmd):
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.issue_id)
issue.assign_to(
cmd.assignee_address,
assigned_by=cmd.assigner_address
)
uow.commit()
class SendAssignmentEmailHandler
def __init__(self,
uowm: UnitOfWorkManager,
email_builder: EmailBuilder,
email_sender: EmailSender):
self.uowm = uowm
self.email_builder = email_builder
self.email_sender = email_sender
def handle(self, cmd):
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.issue_id)
email = self.email_builder.build(
cmd.assignee_address,
cmd.assigner_address,
issue.problem_description)
self.email_sender.send(email)
我们在这里实际上不需要工作单元,因为我们没有对问题状态进行任何持久性更改,所以如果我们改用视图构建器会怎么样?
class SendAssignmentEmailHandler
def __init__(self,
view: IssueViewBuilder,
email_builder: EmailBuilder,
email_sender: EmailSender):
self.view = view
self.email_builder = email_builder
self.email_sender = email_sender
def handle(self, cmd):
issue = self.view.fetch(cmd.issue_id)
email = self.email_builder.build(
cmd.assignee_address,
cmd.assigner_address,
issue['problem_description'])
self.email_sender.send(email)
这看起来更好,但我们应该如何调用我们的新处理程序?从我们的 AssignIssueHandler 内部构建一个新的命令和处理程序也听起来像是违反了 SRP。更糟糕的是,如果我们开始从处理程序中调用处理程序,我们最终会让我们的用例再次耦合在一起 - 这绝对是违反原则 #1 的。
我们需要的是一种在处理程序之间传递信号的方式 - 一种说“我完成了我的工作,你可以去完成你的工作吗?”的方式
登上消息总线 在这种系统中,我们使用领域事件 [http://verraes.net/2014/11/domain-events/] 来满足这种需求。事件与命令密切相关,因为命令和事件都是消息类型 [http://www.enterpriseintegrationpatterns.com/patterns/messaging/Message.html] - 在实体之间发送的命名数据块。命令和事件仅在意图上有所不同
- 命令以祈使语气命名(做这件事),事件以过去时命名(事情已完成)。
- 命令必须由恰好一个处理程序处理,事件可以由 0 到 N 个处理程序处理。
- 如果在处理命令时发生错误,则整个请求应失败。如果在处理事件时发生错误,我们应该优雅地失败。
我们经常使用领域事件来发出命令已处理的信号,并执行任何额外的簿记工作。我们应该在什么时候使用领域事件?回到我们的原则 #1,我们应该使用事件来触发超出我们直接用例边界的工作流程。在本例中,我们的用例边界是“分配问题”,并且存在第二个需求“通知受让人”,这应该作为次要结果发生。对人类或其他系统的通知是以这种方式触发事件的最常见原因之一,但它们也可能用于清除缓存、重新生成视图模型或执行一些逻辑以使系统最终一致。
有了这些知识,我们就知道该怎么做了 - 当我们将问题分配给工程师时,我们需要引发一个领域事件。但是,我们不想了解事件的订阅者,否则我们将保持耦合;我们需要的是一个中介,一个可以将消息路由到正确位置的基础设施。我们需要的是消息总线。消息总线是一个简单的中间件,负责将消息传递给正确的侦听器。在我们的应用程序中,我们有两种消息类型,命令和事件。这两种类型的消息在某种意义上是对称的,所以我们将对两者都使用单个消息总线。
我们如何开始编写消息总线?嗯,它需要根据事件的名称查找订阅者。这听起来像是一个字典
class MessageBus:
def __init__(self):
"""Our message bus is just a mapping from message type
to a list of handlers"""
self.subscribers = defaultdict(list)
def handle(self, msg):
"""The handle method invokes each handler in turn
with our event"""
msg_name = type(msg).__name__
subscribers = self.subscribers[msg_name]
for subscriber in subscribers:
subscriber.handle(cmd)
def subscribe_to(self, msg, handler):
"""Subscribe sets up a new mapping, we make sure not
to allow more than one handler for a command"""
subscribers = [msg.__name__]
if msg.is_cmd and len(subscribers) > 0:
raise CommandAlreadySubscribedException(msg.__name__)
subscribers.append(handler)
# Example usage
bus = MessageBus()
bus.subscribe_to(ReportIssueCommand, ReportIssueHandler(db.unit_of_work_manager))
bus.handle(cmd)
这里我们有一个最基本的消息总线实现。它没有做任何花哨的事情,但目前可以完成工作。在生产系统中,消息总线是放置横切关注点的绝佳位置;例如,我们可能希望在将命令传递给处理程序之前验证它们,或者我们可能想要执行一些基本的日志记录或性能监控。我想在下一部分中更多地讨论这个问题,届时我们将探讨依赖注入和控制反转容器这个有争议的主题。
现在,让我们看看如何连接它。首先,我们想从我们的 API 处理程序中使用它。
@api.route('/issues', methods=['POST'])
def create_issue(self):
issue_id = uuid.uuid4()
cmd = ReportIssueCommand(issue_id=issue_id, **request.get_json())
bus.handle(cmd)
return "", 201, {"Location": "/issues/" + str(issue_id) }
这里没有太多变化 - 我们仍然在 Flask 适配器中构建我们的命令,但现在我们将其传递到总线中,而不是直接为自己构建处理程序。当我们需要引发事件时怎么办?我们有几种选择可以做到这一点。通常我从我的命令处理程序中引发事件,就像这样
class AssignIssueHandler:
def handle(self, cmd):
with self.uowm.start() as uow:
issue = uow.issues.get(cmd.id)
issue.assign_to(cmd.assigned_to, cmd.assigned_by)
uow.commit()
# This is step 4: notify other parts of the system
self.bus.raise(IssueAssignedToEngineer(
cmd.issue_id,
cmd.assigned_to,
cmd.assigned_by))
我通常将这种事件引发视为一种粘合剂 - 它是编排代码。以这种方式从你的处理程序中引发事件使消息流变得显式 - 你不必在系统的其他任何地方查找以了解哪些事件将从命令中流出。在管道方面,它也很简单。反驳的观点是,这感觉就像我们以与以前完全相同的方式违反了 SRP - 我们正在发送关于我们的工作流程的通知。这真的与直接从处理程序发送电子邮件有什么不同吗?另一种选择是从我们的模型对象直接发送事件,并将它们视为我们领域模型本身的组成部分。
class Issue:
def assign_to(self, assigned_to, assigned_by):
self.assigned_to = assigned_to
self.assigned_by = assigned_by
# Add our new event to a list
self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
这样做有几个好处:首先,它使我们的命令处理程序更简单,但其次,它将决定何时发送事件的逻辑推送到模型中。例如,也许我们并不总是需要引发事件。
class Issue:
def assign_to(self, assigned_to, assigned_by):
self.assigned_to = assigned_to
self.assigned_by = assigned_by
# don't raise the event if I picked the issue myself
if self.assigned_to != self.assigned_by:
self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
现在,我们只会在问题由另一位工程师分配时才引发我们的事件。像这样的情况更像是业务逻辑而不是粘合代码,所以今天我选择将它们放在我的领域模型中。更新我们的单元测试很简单,因为我们只是将事件作为列表暴露在我们的模型对象上
class When_assigning_an_issue:
issue_id = uuid.uuid4()
assigned_to = 'ashley@example.org'
assigned_by = 'laura@example.org'
def given_a_new_issue(self):
self.issue = Issue(self.issue_id, 'reporter@example.org', 'how do I even?')
def because_we_assign_the_issue(self):
self.issue.assign(self.assigned_to, self.assigned_by)
def we_should_raise_issue_assigned(self):
expect(self.issue).to(have_raised(
IssueAssignedToEngineer(self.issue_id,
self.assigned_to,
self.assigned_by)))
have_raised 函数是我编写的自定义匹配器,它检查我们对象的 events 属性,以查看我们是否引发了正确的事件。测试事件的存在很容易,因为它们是 namedtuples,并且具有值相等性。
剩下的就是将事件从我们的模型对象中取出并放入我们的消息总线中。我们需要一种方法来检测我们是否完成了一个用例并准备好刷新我们的更改。幸运的是,我们已经为此命名了 - 它是工作单元。在这个系统中,我正在使用 SQLAlchemy 的事件钩子 [https://docs.sqlalchemy.org.cn/en/latest/orm/session_events.html] 来找出哪些对象发生了更改,并将它们的事件排队。当工作单元退出时,我们引发事件。
class SqlAlchemyUnitOfWork(UnitOfWork):
def __init__(self, sessionfactory, bus):
self.sessionfactory = sessionfactory
self.bus = bus
# We want to listen to flush events so that we can get events
# from our model objects
event.listen(self.sessionfactory, "after_flush", self.gather_events)
def __enter__(self):
self.session = self.sessionfactory()
# When we first start a unit of work, create a list of events
self.flushed_events = []
return self
def commit(self):
self.session.flush()
self.session.commit()
def rollback(self):
self.session.rollback()
# If we roll back our changes we should drop all the events
self.events = []
def gather_events(self, session, ctx):
# When we flush changes, add all the events from our new and
# updated entities into the events list
flushed_objects = ([e for e in session.new]
+ [e for e in session.dirty])
for e in flushed_objects:
self.flushed_events += e.events
def publish_events(self):
# When the unit of work completes
# raise any events that are in the list
for e in self.flushed_events:
self.bus.handle(e)
def __exit__(self, type, value, traceback):
self.session.close()
self.publish_events()
好的,我们在这里涵盖了很多内容。我们讨论了您可能想要使用领域事件的原因,消息总线在实践中是如何工作的,以及我们如何从我们的领域中获取事件并放入我们的订阅者中。最新的代码示例 [https://github.com/bobthemighty/blog-code-samples/tree/master/ports-and-adapters/04] 演示了这些想法,请务必查看它,运行它,打开拉取请求,打开 Github 问题等。
有些人对消息总线或工作单元的设计感到紧张,但这只是基础设施 - 只要它能工作,就可以很丑陋。在最初的几个用户故事之后,我们不太可能再更改这段代码。在这里有一些粗糙的代码是可以的,只要它在我们的粘合层中,安全地远离我们的领域模型。记住,我们做这一切都是为了让我们的领域模型保持纯粹,并在我们需要重构时保持灵活性。并非系统的所有层都是平等的,粘合代码只是粘合剂。
下次我想谈谈依赖注入,为什么它很棒,以及为什么它没有什么可怕的。