8.События и шина сообщений
События и шина сообщений
Итак, мы потратили кучу времени и энергии на простую проблему, которую мы могли бы легко решить с помощью Django. Возможно, вы задаётесь вопросом, действительно ли повышенная тестируемость и выразительность стоят всех усилий?!
Однако на практике мы обнаруживаем, что не очевидные функции создают беспорядок в наших кодовых базах: это нечто липкое и тупое скопившееся по краю. Это отчеты, разрешения и рабочие процессы, которые затрагивают миллион объектов.
Нашим примером будет типичное требование к уведомлению: когда мы не можем разместить заказ, потому что его нет в наличии, мы должны предупредить отдел сбыта. Они пойдут и решат проблему, закупив побольше запасов, и все будет хорошо.
В случае первой версии, наш владелец только должен отправить предупреждение по электронной почте.
Давайте посмотрим, как выдержит наша архитектура, когда нам нужно подключить некоторые прозаичные вещи, которые составляют так много наших систем.
Мы начнём с самого простого, самого быстрого решения и дальше поговорим о том, почему именно такое решение приводит нас к Большому Комку грязи.
Затем мы покажем, как использовать шаблон Domain Events для отделения побочных эффектов от наших вариантов использования, и как использовать простой шаблон Message Bus для запуска поведения на основе этих событий. Мы покажем несколько вариантов для создания этих событий и того, как передать их в шину сообщений, и, наконец, мы покажем, как можно изменить шаблон Unit of Work, чтобы элегантно соединить их вместе, как показано в <<message_bus_diagram> >.

Код этой главы находится в ветке chapter_08_events_and_message_bus на GitHub: |
Как избежать беспорядка
Так. Уведомления по электронной почте, когда у нас заканчивается товар. Когда у нас появляются новые требования, вроде тех, которые в действительности не имеют ничего общего с основным доменом, очень легко начать сбрасывать эти вещи в наши веб-контроллеры.
Во-первых, давайте не будем путать наши веб-контроллеры
В качестве одноразового взлома, это может быть допустимо:
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
line = model.OrderLine(
request.json['orderid'],
request.json['sku'],
request.json['qty'],
)
try:
uow = unit_of_work.SqlAlchemyUnitOfWork()
batchref = services.allocate(line, uow)
except (model.OutOfStock, services.InvalidSku) as e:
send_mail(
'out of stock',
'stock_admin@made.com',
f'{line.orderid} - {line.sku}'
)
return jsonify({'message': str(e)}), 400
return jsonify({'batchref': batchref}), 201
…но легко понять, как мы можем быстро попасть в переделку, если так всё сделать. Отправка электронной почты не является задачей нашего HTTP-уровня, и мы хотели бы иметь возможность протестировать эту новую функцию.
И давайте не будем портить нашу модель
Предполагая, что мы не хотим помещать этот код в наши веб-контроллеры, потому что мы хотим, чтобы они были как можно более тонкими, мы можем посмотреть на то, чтобы поместить его прямо в источник, в модель:
def allocate(self, line: OrderLine) -> str:
try:
batch = next(
b for b in sorted(self.batches) if b.can_allocate(line)
)
#...
except StopIteration:
email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
raise OutOfStock(f'Out of stock for sku {line.sku}')
Но это еще хуже! Мы не хотим, чтобы наша модель имела какие-либо зависимости от инфраструктурных проблем, таких как email.send_mail.
Эта штука с отправкой электронной почты нежелательна сгусток, испортивший приятный чистый поток нашей системы. Мы хотели бы, чтобы наша модель предметной области была ориентирована на правило "Вы не можете выделить больше материала, чем на самом деле доступно."
Или уровень обслуживания!
Требование "Попробуйте распределить некоторый запас и отправить электронное письмо, если это не удастся" является примером оркестровки рабочего процесса: это набор шагов, которые система должна выполнить, чтобы достичь цели.
Мы написали сервисный уровень для управления оркестровкой для нас, но даже здесь эта функция кажется неуместной:
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
try:
batchref = product.allocate(line)
uow.commit()
return batchref
except model.OutOfStock:
email.send_mail('stock@made.com', f'Out of stock for {line.sku}')
raise
Поймать исключение и сделать ререйз? Могло быть и хуже, но это определенно нас огорчает. Почему так сложно найти подходящий дом для этого кода?
Single Responsibility Principle
На самом деле, это нарушение принципа единственной ответственности (SRP) .footnote: [Этот принцип — S в SOLID.] Наш пример использования — распределение. Наша конечная точка, служебная функция и методы домена называются [.keep-together] allocate, а не allocate_and_send_mail_if_out_of_stock.
| Эмпирическое правило: если вы не можете описать, что делает ваша функция, не используя такие слова, как "тогда" или "и", вы можете нарушить SRP. |
Согласно одной из формулировок SRP, у каждого класса должна быть только одна причина для изменения. Когда мы переключаемся с электронной почты на SMS, нам не нужно обновлять нашу функцию allocate(), потому что это явно отдельная ответственность.
Чтобы решить эту проблему, мы разделим оркестровку на отдельные этапы, чтобы различные проблемы не перепутались.[1] Задача модели домена состоит в том, чтобы знать, что у нас нет запасов, но ответственность за отправку предупреждения лежит на другом месте. Мы должны иметь возможность включать или выключать эту функцию или переключаться на SMS-уведомления вместо этого, не меняя правила нашей доменной модели.
Мы также хотели бы сохранить уровень сервиса свободным от деталей реализации. Мы хотим применить принцип инверсии зависимостей к уведомлениям, чтобы наш уровень обслуживания зависел от абстракции, точно так же, как мы избегаем зависимости от базы данных, используя единицу работы.
Все на борт автобуса Сообщений!
Шаблоны, которые мы собираемся здесь представить, - это Domain Events События домена и Message Bus Шина сообщений. Мы можем реализовать их несколькими способами, поэтому мы покажем пару, прежде чем остановимся на том, который нам больше всего нравится.
Модель Записывает События
Во-первых, вместо того, чтобы беспокоиться об электронных письмах, наша модель будет отвечать за регистрацию events (событий) - факты о том, что произошло. Мы будем использовать шину сообщений, чтобы отвечать на события и вызывать новую операцию.
События (events) - это простые классы данных
event-это своего рода value object. События не имеют никакого поведения, потому что они являются чистыми структурами данных. Мы всегда называем события на языке домена и думаем о них как о части нашей модели домена.
Мы могли бы хранить их в model.py, но мы также можем хранить их в отдельном файле. (возможно, сейчас самое подходящее время подумать о рефакторинге каталога с именем domain, чтобы у нас был domain/model.py и domain/events.py):
from dataclasses import dataclass
class Event: #
pass
@dataclass
class OutOfStock(Event): #
sku: str
| Как только у нас будет несколько событий, нам будет полезно иметь родительский класс, который может хранить общие атрибуты. Это также полезно для подсказок типа в нашей шине сообщений, как вы вскоре увидите. | |
dataclasses отлично подходят и для доменных событий. |
Модель вызывает события
Когда наша модель домена фиксирует факт, который произошел, мы говорим, что это raises (поднимает) событие.
Вот как это будет выглядеть со стороны; если мы попросим "Product" распределить ( allocate ), но он не сможет, он должен raise (поднять) событие:
def test_records_out_of_stock_event_if_cannot_allocate():
batch = Batch('batch1', 'SMALL-FORK', 10, eta=today)
product = Product(sku="SMALL-FORK", batches=[batch])
product.allocate(OrderLine('order1', 'SMALL-FORK', 10))
allocation = product.allocate(OrderLine('order2', 'SMALL-FORK', 1))
assert product.events[-1] == events.OutOfStock(sku="SMALL-FORK") #
assert allocation is None
Наш агрегат предоставит новый атрибут под названием .events, который будет содержать список фактов о том, что произошло, в форме объектов Event. |
Вот как выглядит модель изнутри:
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0):
self.sku = sku
self.batches = batches
self.version_number = version_number
self.events = [] # type: List[events.Event] #
def allocate(self, line: OrderLine) -> str:
try:
#...
except StopIteration:
self.events.append(events.OutOfStock(line.sku)) #
# raise OutOfStock(f'Out of stock for sku {line.sku}') #
return None
Вот наш новый атрибут .events. | |
| Вместо того, чтобы напрямую вызывать какой-либо код отправки электронной почты, мы записываем эти события в том месте, где они происходят, используя только язык домена. | |
| Мы также собираемся прекратить создавать исключение для случая отсутствия на складе. Событие выполнит ту работу, которую выполняло исключение. |
| На самом деле мы "принюхиваемся" к коду, который мы рассматривали до сих пор, а именно к тому, что обсуждается в использование исключений для потока управления. В общем случае, если вы реализуете доменные события, не создавайте исключений для описания одной и той же концепции домена. Как вы увидите позже, когда мы будем обрабатывать события в шаблоне Unit of Work, это сбивает с толку, когда приходится рассуждать о совместном использовании событий и исключений. |
Шина сообщений сопоставляет События(Events) с Обработчиками(Handlers)
Шина сообщений в основном говорит: "Когда я вижу это событие, я должен вызвать следующую функцию обработчика". Другими словами, это простая система подписки на публикации. Обработчики подписаны (subscribed) на получение событий, которые мы размещаем в шине. Это звучит сложнее, чем есть на самом деле, и мы обычно реализуем это с помощью dict:
def handle(event: events.Event):
for handler in HANDLERS[type(event)]:
handler(event)
def send_out_of_stock_notification(event: events.OutOfStock):
email.send_mail(
'stock@made.com',
f'Out of stock for {event.sku}',
)
HANDLERS = {
events.OutOfStock: [send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
| Обратите внимание, что реализованная шина сообщений не дает нам параллелизма, потому что одновременно будет работать только один обработчик. Наша цель состоит не в том, чтобы поддерживать параллельные потоки, а в том, чтобы концептуально разделить задачи и сделать каждый UoW как можно меньше. Это помогает нам понять кодовую базу, потому что "рецепт" для запуска каждого варианта использования написан в одном месте. См. следующую боковую панель. |
Вариант 1. Уровень сервиса Принимает События из Модели и Помещает их в Шину сообщений
Наша доменная модель вызывает события, и наша шина сообщений будет вызывать правые обработчики всякий раз, когда происходит событие. Теперь все, что нам нужно, — это соединить их. Нам нужно что-то, чтобы перехватить события из модели и передать их в шину сообщений — этап publishing.
Самый простой способ сделать это — добавить код в наш сервисный слой:
from . import messagebus
...
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
try: #
batchref = product.allocate(line)
uow.commit()
return batchref
finally: #
messagebus.handle(product.events) #
Мы сохраняем try/finally из нашей уродливой более ранней реализации (мы еще не избавились от всех исключений, просто OutOfStock). | |
| Но теперь, вместо того чтобы напрямую зависеть от инфраструктуры электронной почты, уровень сервиса отвечает только за передачу событий от модели до шины сообщений. |
Это уже позволяет избежать некоторого уродства, которое мы имели в нашей наивной реализации, и у нас есть несколько систем, работающих подобно этой, в которых уровень обслуживания явно собирает события из агрегатов и передает их в шину сообщений.
Вариант 2: Уровень Сервиса Создает Свои Собственные События
Другой вариант, который мы использовали, - это сделать так, чтобы уровень сервиса отвечал за создание и инициирование событий напрямую, а не за их создание моделью предметной области:
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
batchref = product.allocate(line)
uow.commit() #
if batchref is None:
messagebus.handle(events.OutOfStock(line.sku))
return batchref
| Как и раньше, мы коммитим событие, даже если ничего не можем зарезервировать, потому что код таким образом проще и легче понимать: мы всегда фиксируем, если что-то не идет не так. Фиксация, когда мы ничего не изменили, безопасна и сохраняет код незагроможденным. |
Опять же, у нас есть приложения в производстве (production), которые реализуют шаблон таким образом. То, что работает для вас, будет зависеть от конкретных компромиссов, с которыми вы столкнётесь, но мы хотели бы показать вам, что мы считаем наиболее элегантным решением, в котором мы помещаем единицу работы, отвечающую за сбор и обработку событий.
Вариант 3: UoW публикует события в шине сообщений
У UoW уже есть блок try/finally, и он знает обо всех агрегатах, находящихся в данный момент в игре, потому что он предоставляет доступ к репозиторию. Так что это хорошее место для обнаружения событий и передачи их в шину сообщений:
class AbstractUnitOfWork(abc.ABC):
...
def commit(self):
self._commit() #
self.publish_events() #
def publish_events(self): #
for product in self.products.seen: #
while product.events:
event = product.events.pop(0)
messagebus.handle(event)
@abc.abstractmethod
def _commit(self):
raise NotImplementedError
...
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
...
def _commit(self): #
self.session.commit()
Мы изменим наш метод фиксации, чтобы запросить частный метод ._commit() из подклассов. | |
| После фиксации мы прогоняем все объекты, которые воспринял наш репозиторий, и передаем их события в шину сообщений. | |
Это зависит от репозитория, отслеживающего агрегаты, которые были загружены с использованием нового атрибута, .seen, как вы увидите в следующем листинге. |
| Вам интересно, что произойдет, если один из обработчиков выйдет из строя? Мы подробно обсудим обработку ошибок в [chapter_10_commands]. |
class AbstractRepository(abc.ABC):
def __init__(self):
self.seen = set() # type: Set[model.Product] #
def add(self, product: model.Product): #
self._add(product)
self.seen.add(product)
def get(self, sku) -> model.Product: #
product = self._get(sku)
if product:
self.seen.add(product)
return product
@abc.abstractmethod
def _add(self, product: model.Product): #
raise NotImplementedError
@abc.abstractmethod #
def _get(self, sku) -> model.Product:
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
def __init__(self, session):
super().__init__()
self.session = session
def _add(self, product): #
self.session.add(product)
def _get(self, sku): #
return self.session.query(model.Product).filter_by(sku=sku).first()
Чтобы UoW мог публиковать новые события, он должен иметь возможность запрашивать репозиторий, для каких объектов Product использовались во время этого сеанса. Мы используем set под названием` .seen` для их хранения. Это означает, что наши реализации должны вызывать super().__ init __() . | |
Родительский метод add() добавляет элементы в .seen и теперь требует jn подклассов реализацию ._add(). | |
Аналогично, .get() делегирует функцию ._get (), которая должна быть реализована подклассами, чтобы захватить видимые объекты. |
Использование методов ._underscorey() и подклассов определенно не является единственным способом реализации этих шаблонов. Попробуйте воспользоваться "Упражнения для читателя" в этой главе и поэкспериментируйте с некоторыми альтернативами. |
После того, как UoW и репозиторий будут сотрудничать таким образом, чтобы автоматически отслеживать живые объекты и обрабатывать их события, уровень сервиса может быть полностью свободен от проблем с обработкой событий:
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
batchref = product.allocate(line)
uow.commit()
return batchref
Мы также должны помнить, что надо изменить фейки в сервисном слое и заставить их вызывать super() в нужных местах, а также реализовать методы c двойным подчёркиванием ("str","repr"), но изменения минимальны:
class FakeRepository(repository.AbstractRepository):
def __init__(self, products):
super().__init__()
self._products = set(products)
def _add(self, product):
self._products.add(product)
def _get(self, sku):
return next((p for p in self._products if p.sku == sku), None)
...
class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork):
...
def _commit(self):
self.committed = True
Возможно, вы начинаете беспокоиться о том, что поддержание этих фейков будет бременем для обслуживания. Нет никаких сомнений, что это работа, но по нашему опыту это не так уж много работы. Как только ваш проект запущен и работает, интерфейс для вашего репозитория и абстракций UoW действительно не сильно меняется. И если вы используете ABC, это поможет вам вспомнить, когда что-то выходит из синхронизации.
Подведение итогов
События домена дают нам возможность управлять рабочими процессами в нашей системе. Мы часто обнаруживаем, слушая наших экспертов в предметной области, что они выражают требования причинным или временным образом - например, «Когда мы пытаемся распределить запасы, но их нет в наличии, мы должны отправить электронное письмо отделу снабжения».
Волшебные слова "When X, then Y" часто говорят нам о событии, которое мы можем сделать конкретным в нашей системе. Рассматривая события как first-class вещи в нашей модели, мы делаем наш код более тестируемым и наблюдаемым, а также изолируем проблемы.
И [chapter_08_events_and_message_bus_tradeoffs] показывает компромиссы, как мы их видим.
| Плюсы | Минусы |
|---|---|
|
|
Однако события полезны не только для отправки электронной почты. В [chapter_07_aggregate] мы потратили много времени, убеждая вас, что вы должны определить агрегаты или границы, где мы гарантируем согласованность. Люди часто спрашивают: "Что мне делать, если мне нужно изменить несколько агрегатов в рамках запроса?" Теперь у нас есть инструменты, необходимые для ответа на этот вопрос.
Если у нас есть две вещи, которые могут быть транзакционно изолированы (например, заказ и product), то мы можем сделать их eventually consistent (в конечном итоге согласованными) с помощью событий. Когда заказ отменяется, мы должны найти продукты, которые были ему назначены, и удалить allocations.
В [chapter_09_all_messagebus] мы рассмотрим эту идею более подробно при построении более сложного рабочего процесса с нашей новой шиной сообщений.
Комментарии
Отправить комментарий