9.Едем в город на автобусе сообщений

 

Едем в город на автобусе сообщений

В этой главе мы начнем делать события более фундаментальными для внутренней структуры нашего приложения. Мы перейдем из текущего состояния в [maps_chapter_08_before], где события являются необязательным побочным эффектом …

images/apwp_0901.png
Figure 1. Раньше: шина сообщений является необязательным дополнением

…к ситуации в [map_chapter_08_after], где всё идет по шине сообщений, а наше приложение было принципиально преобразовано в процессор сообщений.

images/apwp_0902.png
Figure 2. Шина сообщений теперь является основной точкой входа на уровень сервиса.
Tip

Код для этой главы находится в ветке chapter_09_all_messagebus. на GitHub:

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_09_all_messagebus
# или, чтобы кодировать вместе, проверьте предыдущую главу:
git checkout chapter_08_events_and_message_bus

Новое требование приводит нас к новой архитектуре

Рич Хикки говорит о situated software, то есть о программном обеспечении, которое работает в течение длительных периодов времени, управляя реальным процессом. Примеры включают системы управления складом, логистические планировщики и системы расчета заработной платы.

Такое программное обеспечение сложно написать, потому что в реальном мире физических объектов и ненадежных людей постоянно происходят неожиданные вещи. Например:

  • Во время инвентаризации мы обнаруживаем, что три <код>ПРУЖИННЫЙ МАТРАСбыли повреждены водой из-за протекающей крыши.

  • У груза RELIABLE-FORKs отсутствует необходимая документация и он застрял на таможне, где находится уже в течение нескольких недель. Три RELIABLE-FORKs впоследствии не проходят проверку безопасности и уничтожаются.

  • Глобальная нехватка блесток означает, что мы не сможем изготовить следующую партию SPARKLY-BOOKCASE.

В таких ситуациях мы узнаем о необходимости изменения количества партий, когда они уже находятся в системе. Может быть, кто-то ошибся номером в декларации, а может быть, какие-то диваны упали с грузовика. После разговора с представителями компании[1] мы моделируем ситуацию, как на [batch_changed_events_flow_diagram].

images/apwp_0903.png
Figure 3. Изменение количества партии означает освобождение и перераспределение (deallocate and reallocate)

Событие, которое мы назовем BatchQuantityChanged, должно привести нас к изменению количества в партии, да, но также и к применению бизнес правила: если новое количество опускается до меньшего, чем уже распределённое общее количество, нам нужно dealocate освободить места в этих заказы из этой партии. Затем каждый из них потребует нового распределения, которое мы можем зафиксировать как событие под названием AllocationRequired Требуемое распределение.

Возможно, вы уже ожидаете, что наша внутренняя шина сообщений и события помогут реализовать это требование. Мы могли бы определить службу под названием change_batch_quantity, которая знает, как корректировать количество партий, а также как deallocate (освобождать) любые избыточные строки заказа, а затем каждое освобождение может генерировать событие AllocationRequired, которое может быть перенаправлено существующей службе allocate (распределения) в отдельные транзакции. И снова наша шина сообщений помогает нам обеспечивать соблюдение принципа единой ответственности и позволяет нам делать выбор в отношении транзакций и целостности данных.

Представим себе изменение архитектуры: всё будет event handler Обработчик событий

Но прежде чем запрыгнуть в это, подумай, куда мы направляемся. Существует два вида потоков через нашу систему:

  • Вызовы API, которые обрабатываются функцией уровня сервиса

  • Внутренние события (которые могут быть вызваны как побочный эффект функции уровня сервиса) и их обработчики (которые, в свою очередь, вызывают функции уровня сервиса)

Разве не было бы проще, если бы все было обработчиком событий? Если мы переосмыслим наши вызовы API как захват событий, функции уровня сервиса также могут быть обработчиками событий, и нам больше не нужно проводить различие между внутренними и внешними обработчиками событий:

  • services.allocate() может быть обработчиком события AllocationRequired и может выдавать события Allocated в качестве его выходных данных.

  • services.add_batch() может быть обработчиком события BatchCreated.[2]

Наше новое требование будет соответствовать той же схеме:

  • Событие под названием BatchQuantityChanged может вызывать обработчик под названием change_batch_quantity().

  • И новые события AllocationRequired, которые он может вызвать, также могут быть переданы в `services.allocate() `, поэтому нет концептуальной разницы между совершенно новым распределением, исходящим от API, и перераспределением, которое запускается изнутри deallocate.

Все это звучит как-то чересчур? Давайте работать над всем этим постепенно. Мы будем следовать за рабочим процессом Подготовительный рефакторинг, он же гласит "Сделайте изменение легким; затем сделайте легкое изменение":

  1. Мы рефакторингуем наш уровень обслуживания в обработчики событий. Мы можем привыкнуть к идее о том, что события-это способ описания входов в систему. В частности, существующая функция services.allocate() станет обработчиком события под названием AllocationRequired.

  2. Мы создаем сквозной тест,который помещает события BatchQuantityChanged в систему и принимает выходящие события Allocated.

  3. Наша реализация концептуально будет очень простой: новый обработчик событий BatchQuantityChanged, реализация которого будет выдавать события AllocationRequired, которые, в свою очередь, будут обрабатываться точно таким же обработчиком распределений, который использует API.

По пути мы сделаем небольшую настройку шины сообщений и UoW, перенеся ответственность за размещение (put) новых событий на шине сообщений в саму шину сообщений.

Рефакторинг сервисных функций для обработчиков сообщений

Мы начинаем с определения двух событий, которые фиксируют наши текущие входные данные API - AllocationRequired и BatchCreated:

Example 1. События BatchCreated и AllocationRequired (src/allocation/domain/events.py)
@dataclass
class BatchCreated(Event):
    ref: str
    sku: str
    qty: int
    eta: Optional[date] = None

...

@dataclass
class AllocationRequired(Event):
    orderid: str
    sku: str
    qty: int

Затем мы переименовываем services.py в handlers.py; мы добавляем обработчик текущих сообщений для send_out_of_stock_notification; и самое главное, мы меняем все обработчики так, чтобы у них были одинаковые входные данные, событие и UoW:

Example 2. Обработчики и сервисы - это одно и то же (src/allocation/service_layer/handlers.py)
def add_batch(
        event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
):
    with uow:
        product = uow.products.get(sku=event.sku)
        ...


def allocate(
        event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:
    line = OrderLine(event.orderid, event.sku, event.qty)
    ...


def send_out_of_stock_notification(
        event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
):
    email.send(
        'stock@made.com',
        f'Out of stock for {event.sku}',
    )

Это изменение станет более ясным если помотреть на различие:

Example 3. Переход от сервисов к обработчикам (src/allocation/service_layer/handlers.py)
 def add_batch(
-        ref: str, sku: str, qty: int, eta: Optional[date],
-        uow: unit_of_work.AbstractUnitOfWork
+        event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
 ):
     with uow:
-        product = uow.products.get(sku=sku)
+        product = uow.products.get(sku=event.sku)
     ...


 def allocate(
-        orderid: str, sku: str, qty: int,
-        uow: unit_of_work.AbstractUnitOfWork
+        event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
 ) -> str:
-    line = OrderLine(orderid, sku, qty)
+    line = OrderLine(event.orderid, event.sku, event.qty)
     ...

+
+def send_out_of_stock_notification(
+        event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
+):
+    email.send(
     ...

Попутно мы сделали API нашего сервисного уровня более структурированным и последовательным. Это было рассеяние примитивов, и теперь используются четко определенные объекты (см. Следующую главу).

От Domain Objects через Primitive Obsession к событиям в качестве интерфейса

Некоторые из вас, возможно, помнят [primitive_obsession], в котором мы изменили наш API сервисного уровня с точки зрения доменных объектов на примитивы. А теперь мы возвращаемся назад, но к другим объектам? Что это дает?

В кругах ОО люди говорят о primitive obsession как об антипаттере: Они скорее всего порекомендовали бы, избегать примитивов в общедоступных API и вместо этого оборачивать их пользовательскими классами значений. В мире Python многие люди отнесутся к этому весьма скептически. При бездумном применении это, безусловно, рецепт ненужной сложности. Так что, по сути, мы этим не занимаемся.

Переход от доменных объектов к примитивам принес нам хорошую развязку: наш клиентский код больше не был связан непосредственно с доменом, поэтому уровень сервиса мог представить API, который остается неизменным, даже если мы решим внести изменения в нашу модель, и наоборот.

Итак, мы отступили? Ну, наши основные объекты модели предметной области по-прежнему свободны варьироваться, но вместо этого мы связали внешний мир с нашими классами событий. Они тоже часть домена, но есть надежда, что они меняются реже, так что они разумный артефакт для пары.

И что мы приобрели? Теперь при вызове варианта использования в нашем приложении нам больше не нужно запоминать конкретную комбинацию примитивов, а только один класс событий, представляющий входные данные для нашего приложения. Это концептуально довольно мило. Кроме того, как вы увидите в [appendix_validation], эти классы событий могут быть хорошим местом для некоторой проверки входных данных.

The Message Bus Now Collects Events from the UoW

Наши обработчики событий теперь нуждаются в UoW. Кроме того, поскольку наша шина сообщений становится всё более центральной для нашего приложения, имеет смысл явно возложить на неё ответственность за сбор и обработку новых событий. До сих пор существовала некоторая циклическая зависимость между UoW и шиной сообщений, так что это сделает её односторонней. Вместо того, чтобы иметь события UoW push на шине сообщений, мы будем иметь события message bus pull из UoW.

Example 4. Handle принимает UoW и управляет очередью (src/allocation/service_layer/messagebus.py)
def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):  #1
    queue = [event]  #2
    while queue:
        event = queue.pop(0)  #3
        for handler in HANDLERS[type(event)]:  #3
            handler(event, uow=uow)  #4
            queue.extend(uow.collect_new_events())  #5
1Шина сообщений теперь проходит UoW при каждом запуске.
2Когда мы начинаем обрабатывать наше первое событие, мы запускаем очередь.
3Мы извлекаем события из передней части очереди и вызываем их обработчики (HANDLERS dict не изменился; он по-прежнему сопоставляет типы событий с функциями обработчиков).
4Шина сообщений передает UoW каждому обработчику.
5После завершения каждого обработчика мы собираем все новые сгенерированные события и добавляем их в очередь.

В unit_of_work.py 'publish_events()` становится менее активным методом, collect_new_events():

Example 5. UoW больше не помещает события прямо в шину (src/allocation/service_layer/unit_of_work.py)
-from . import messagebus  #1
-


 class AbstractUnitOfWork(abc.ABC):
@@ -23,13 +21,11 @@ class AbstractUnitOfWork(abc.ABC):

     def commit(self):
         self._commit()
-        self.publish_events()  #2

-    def publish_events(self):
+    def collect_new_events(self):
         for product in self.products.seen:
             while product.events:
-                event = product.events.pop(0)
-                messagebus.handle(event)
+                yield product.events.pop(0)  #3
1Модуль unit_of_work теперь больше не зависит от messagebus.
2Мы больше не выполняем publish_events автоматически при фиксации. Вместо этого шина сообщений отслеживает очередь событий.
3И UoW больше не размещает активные события в шину сообщений; он просто делает их доступными.

Наши тесты тоже написаны в терминах событий

Наши тесты теперь работают, создавая события и помещая их в шину сообщений, а не вызывая функции сервисного уровня напрямую:

Example 6. Тесты обработчиков используют события (tests/unit/test_handlers.py)
class TestAddBatch:

     def test_for_new_product(self):
         uow = FakeUnitOfWork()
-        services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow)
+        messagebus.handle(
+            events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None), uow
+        )
         assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
         assert uow.committed

...

 class TestAllocate:

     def test_returns_allocation(self):
         uow = FakeUnitOfWork()
-        services.add_batch("batch1", "COMPLICATED-LAMP", 100, None, uow)
-        result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow)
+        messagebus.handle(
+            events.BatchCreated("batch1", "COMPLICATED-LAMP", 100, None), uow
+        )
+        result = messagebus.handle(
+            events.AllocationRequired("o1", "COMPLICATED-LAMP", 10), uow
+        )
         assert result == "batch1"

Временный Наглый Взлом: шина сообщений должна возвращать результаты

Наш API и наш уровень сервиса в настоящее время хотят узнать выделенную ссылку на пакет, когда они вызывают наш обработчик allocate(). Это означает, что нам нужно временно взломать нашу шину сообщений, чтобы она возвращала события:

Example 7. Message bus returns results (src/allocation/service_layer/messagebus.py)
 def handle(event: events.Event, uow: unit_of_work.AbstractUnitOfWork):
+    results = []
     queue = [event]
     while queue:
         event = queue.pop(0)
         for handler in HANDLERS[type(event)]:
-            handler(event, uow=uow)
+            results.append(handler(event, uow=uow))
             queue.extend(uow.collect_new_events())
+    return results

Это потому, что мы смешиваем обязанности чтения и записи в нашей системе. Мы вернемся, чтобы исправить эту неприятность в [chapter_12_cqrs].

Изменение нашего API для работы с событиями

Example 8. Diff при замене Flask на шину сообщений (src/allocation/entrypoints/flask_app.py)
 @app.route("/allocate", methods=['POST'])
 def allocate_endpoint():
     try:
-        batchref = services.allocate(
-            request.json['orderid'],  #1
-            request.json['sku'],
-            request.json['qty'],
-            unit_of_work.SqlAlchemyUnitOfWork(),
+        event = events.AllocationRequired(  #2
+            request.json['orderid'], request.json['sku'], request.json['qty'],
         )
+        results = messagebus.handle(event, unit_of_work.SqlAlchemyUnitOfWork())  #3
+        batchref = results.pop(0)
     except InvalidSku as e:
1Вместо вызова уровня сервиса с кучей примитивов, извлеченных из запроса JSON …
2Создаем событие.
3Затем передаем его в шину сообщений.

И мы должны вернуться к полностью функциональному приложению, но теперь полностью управляемому событиями:

  • То, что раньше было функциями сервисного уровня, теперь стало обработчиками событий.

  • Это делает их такими же, как функции, которые мы вызываем для обработки внутренних событий, вызванных нашей моделью предметной области.

  • Мы используем события в качестве структуры данных для сбора входных данных в систему, а также для передачи внутренних рабочих пакетов.

  • Теперь все приложение лучше всего описать как процессор сообщений или, если хотите, процессор событий. Мы поговорим об этом различии в следующей главе[chapter_10_commands].

Реализация нашего нового требования (Requirement)

Мы закончили с фазой рефакторинга. Давайте посмотрим, действительно ли мы "сделали изменение легким." Давайте реализуем наше новое требование, показанное в [reallocation_sequence_diagram]: мы получим в качестве входных данных некоторые новые события BatchQuantityChanged и передадим их обработчику, который, в свою очередь, может выдать некоторые события AllocationRequired, а те, в свою очередь, вернутся к нашему существующему обработчику для перераспределения.

images/apwp_0904.png
Figure 4. Диаграмма последовательности для потока перераспределения
WarningКогда вы разделяете вещи таким образом на две единицы работы, то у вас получаются две транзакции базы данных, поэтому вы открываете себя для проблем целостности: что-то может произойти, и это означает, что первая транзакция завершается, а вторая-нет. Вам нужно будет подумать о том, приемлемо ли это, и нужно ли вам замечать, когда это происходит, и что-то с этим делать. См. [footguns] для более подробного обсуждения.

Наше новое событие

Событие, которое говорит нам, что количество партии изменилось, простое; ему просто нужна ссылка на партию и новое количество:

Example 9. Новое событие (src/allocation/domain/events.py)
@dataclass
class BatchQuantityChanged(Event):
    ref: str
    qty: int

Test-Driving нового Handler

Следуя урокам, извлеченным из [chapter_04_service_layer], мы можем работать на «высокой передаче» и писать наши модульные тесты на максимально возможном уровне абстракции с точки зрения событий. Вот как они могут выглядеть:

Example 10. Тесты обработчика для change_batch_quantity (tests/unit/test_handlers.py)
class TestChangeBatchQuantity:

    def test_changes_available_quantity(self):
        uow = FakeUnitOfWork()
        messagebus.handle(
            events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None), uow
        )
        [batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
        assert batch.available_quantity == 100  #1

        messagebus.handle(events.BatchQuantityChanged("batch1", 50), uow)

        assert batch.available_quantity == 50  #1


    def test_reallocates_if_necessary(self):
        uow = FakeUnitOfWork()
        event_history = [
            events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
            events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
            events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
            events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
        ]
        for e in event_history:
            messagebus.handle(e, uow)
        [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
        assert batch1.available_quantity == 10
        assert batch2.available_quantity == 50

        messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

        # order1 or order2 will be deallocated, so we'll have 25 - 20
        assert batch1.available_quantity == 5  #2
        # and 20 will be reallocated to the next batch
        assert batch2.available_quantity == 30  #2
1Простой случай будет тривиально легко реализовать; мы просто модифицируем количество.
2Но если мы попытаемся изменить количество на меньшее, чем было выделено, нам нужно будет исключить по крайней мере один заказ, и перераспределить его на новую ожидаемую партию.

Реализация

Наш новый обработчик очень прост:

Example 11. Обработчик делегирует уровень модели (src/allocation/service_layer/handlers.py)
def change_batch_quantity(
        event: events.BatchQuantityChanged, uow: unit_of_work.AbstractUnitOfWork
):
    with uow:
        product = uow.products.get_by_batchref(batchref=event.ref)
        product.change_batch_quantity(ref=event.ref, qty=event.qty)
        uow.commit()

Мы понимаем, что нам понадобится новый тип запроса в нашем репозитории:

Example 12. Новый тип запроса в нашем репозитории (src/allocation/adapters/repository.py)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class AbstractRepository(abc.ABC):
    ...

    def get(self, sku) -> model.Product:
        ...

    def get_by_batchref(self, batchref) -> model.Product:
        product = self._get_by_batchref(batchref)
        if product:
            self.seen.add(product)
        return product

    @abc.abstractmethod
    def _add(self, product: model.Product):
        raise NotImplementedError

    @abc.abstractmethod
    def _get(self, sku) -> model.Product:
        raise NotImplementedError

    @abc.abstractmethod
    def _get_by_batchref(self, batchref) -> model.Product:
        raise NotImplementedError
    ...

class SqlAlchemyRepository(AbstractRepository):
    ...

    def _get(self, sku):
        return self.session.query(model.Product).filter_by(sku=sku).first()

    def _get_by_batchref(self, batchref):
        return self.session.query(model.Product).join(model.Batch).filter(
            orm.batches.c.reference == batchref,
        ).first()

И в нашем FakeRepository:

Example 13. Обновление фейкового репо тоже (tests/unit/test_handlers.py)
class FakeRepository(repository.AbstractRepository):
    ...

    def _get(self, sku):
        return next((p for p in self._products if p.sku == sku), None)

    def _get_by_batchref(self, batchref):
        return next((
            p for p in self._products for b in p.batches
            if b.reference == batchref
        ), None)
NoteМы добавляем запрос в наш репозиторий, чтобы упростить реализацию этого варианта использования. Пока наш запрос возвращает единственную совокупность, мы не нарушаем никаких правил. Если вы обнаружите, что пишете сложные запросы к своим репозиториям, возможно, вам захочется рассмотреть другой дизайн. Такие методы, как get_most_popular_products или find_products_by_order_id, в частности, определенно вызовут щекотку в области нашего шестого чувства. В [chapter_11_external_events] и epilogue есть несколько советов по управлению сложными запросами.

Новый метод модели предметной области

Мы добавляем в модель новый метод, который выполняет изменение количества и освобождение(ий) встроенным и публикует новое событие. Мы также модифицируем существующую функцию выделения для публикации события:

Example 14. Наша модель развивается в соответствии с новыми требованиями (src/allocation/domain/model.py)
class Product:
    ...

    def change_batch_quantity(self, ref: str, qty: int):
        batch = next(b for b in self.batches if b.reference == ref)
        batch._purchased_quantity = qty
        while batch.available_quantity < 0:
            line = batch.deallocate_one()
            self.events.append(
                events.AllocationRequired(line.orderid, line.sku, line.qty)
            )
...

class Batch:
    ...

    def deallocate_one(self) -> OrderLine:
        return self._allocations.pop()

Подключаем наш новый обработчик:

Example 15. Шина сообщений растет (src/allocation/service_layer/messagebus.py)
HANDLERS = {
    events.BatchCreated: [handlers.add_batch],
    events.BatchQuantityChanged: [handlers.change_batch_quantity],
    events.AllocationRequired: [handlers.allocate],
    events.OutOfStock: [handlers.send_out_of_stock_notification],

}  # type: Dict[Type[events.Event], List[Callable]]

И наше новое требование полностью выполнено.

Опционально: Модульное тестирование Event Handlers изолированно с Fake Message Bus

Наш основной тест для рабочего процесса перераспределения-это edge-to-edge (см. Пример кода в [test-driving-ch9]). Он использует реальную шину сообщений и тестирует весь поток, где обработчик событий BatchQuantityChanged запускает освобождение и выдает новые события AllocationRequired, которые, в свою очередь, обрабатываются их собственными обработчиками. Один тест охватывает цепочку из нескольких событий и обработчиков.

В зависимости от сложности цепочки событий вы можете решить, что хотите протестировать некоторые обработчики отдельно друг от друга. Вы можете сделать это с помощью "поддельной" шины сообщений.

В нашем случае мы фактически вмешиваемся, изменяя метод publish_events() в FakeUnitOfWork и отделяя его от реальной шины сообщений, вместо этого заставляя его записывать события, которые он видит:

Example 16. Шина фальшивых сообщений реализована в UoW (tests/unit/test_handlers.py)
class FakeUnitOfWorkWithFakeMessageBus(FakeUnitOfWork):

    def __init__(self):
        super().__init__()
        self.events_published = []  # type: List[events.Event]

    def publish_events(self):
        for product in self.products.seen:
            while product.events:
                self.events_published.append(product.events.pop(0))

Теперь, когда мы вызываем messagebus.handle() используя FakeUnitOfWorkWithFakeMessageBus, он запускает только обработчик этого события. Таким образом, мы можем написать более изолированный модульный тест: вместо проверки всех побочных эффектов мы просто проверяем, что BatchQuantityChanged приводит к AllocationRequired, если количество падает ниже уже выделенного общего количества:

Example 17. Тестирование перераспределения в изоляции (tests/unit/test_handlers.py)
def test_reallocates_if_necessary_isolated():
    uow = FakeUnitOfWorkWithFakeMessageBus()

    # test setup as before
    event_history = [
        events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
        events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
        events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
        events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
    ]
    for e in event_history:
        messagebus.handle(e, uow)
    [batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
    assert batch1.available_quantity == 10
    assert batch2.available_quantity == 50

    messagebus.handle(events.BatchQuantityChanged("batch1", 25), uow)

    # assert on new events emitted rather than downstream side-effects
    [reallocation_event] = uow.events_published
    assert isinstance(reallocation_event, events.AllocationRequired)
    assert reallocation_event.orderid in {'order1', 'order2'}
    assert reallocation_event.sku == 'INDIFFERENT-TABLE'

Хотите вы этого или нет, зависит от сложности вашей цепочки событий. Мы говорим: начните с сквозного тестирования и прибегайте к нему только в случае необходимости.

Упражнение для читателя

Отличный способ заставить себя действительно понять какой-то код-это его рефакторинг. При обсуждении тестирования обработчиков в изоляции мы использовали нечто под названием FakeUnitOfWorkWithFakeMessageBus, что является излишне сложным и нарушает SRP.

Если мы изменим шину сообщений на класс,[3] тогда создание FakeMessageBus будет более простым:

Example 18. Абстрактная шина сообщений и ее реальные и поддельные версии

Так что сигайте в код на GitHub и посмотрите, сможете ли вы заставить работать версию на основе классов, а затем напишите версию test_reallocates_if_needed_isolated () из более ранней версии.

Мы используем шину сообщений на основе классов в [chapter_13_dependency_injection], если вам нужно больше вдохновения.

Подведём итоги

Давайте оглянемся на то, чего мы достигли, и подумаем, "А нафига?".

Чего мы достигли?

События (Events) - это простые классы данных, которые определяют структуры данных для входных данных. и внутренние сообщения в нашей системе. Это довольно мощно с точки зрения DDD, поскольку события часто очень хорошо переводятся на деловой язык (look up event storming if you haven’t already).

Обработчики (Handlers) - это то, как мы реагируем на события. Они могут обратиться к нашей модели или обратиться к внешним службам. Мы можем определить несколько обработчиков для одного события, если захотим. Обработчики также могут вызывать другие события. Это позволяет нам быть очень детальными в отношении того, что делает обработчик, и действительно придерживаться SRP.

Почему мы достигли цели?

Наша основная цель в отношении этих структурных моделей заключается в том, чтобы сложность нашего приложения росла медленнее, чем его размер. Когда мы идем ва-банк на шине сообщений, то как всегда, платим цену с точки зрения архитектурной сложности (См. [chapter_09_all_messagebus_tradeoffs]), но мы приобретаем себе паттерн, который может обрабатывать сколь угодно сложные требования, не нуждаясь в дальнейших концептуальных или архитектурных изменениях в том, как мы делаем вещи.

Здесь мы добавили довольно сложный вариант использования (изменить количество, освободить место, начать новую транзакцию, перераспределить место, опубликовать внешнее уведомление), но в архитектурном плане сложность не требует затрат. Мы добавили новые события, новые обработчики и новый внешний адаптер (для электронной почты), все из которых являются существующими категориями объектов в нашей архитектуре, которые понятны нам и это легко объяснимы новичкам. Каждая из наших подвижных частей выполняет одну задачу, они четко связаны друг с другом, и нет никаких неожиданных побочных эффектов.

Table 1. Все приложение - это шина сообщений: компромиссы
ПлюсыМинусы
  • Обработчики и сервисы - это одно и то же, так что все проще.

  • У нас есть великолепные структуры данных, чтобы ввести их в систему.

  • Шина сообщений по-прежнему остается несколько непредсказуемым способом делать что-то с веб-точки зрения. Вы не знаете заранее, когда все закончится.

  • Будет происходить дублирование полей и структуры между объектами модели и событиями, что будет иметь затраты на техническое обслуживание. Добавление поля к одному обычно означает добавление поля по крайней мере к одному из других.

Теперь вам может быть интересно, откуда берутся эти события BatchQuantityChanged? Ответ станет понятен через пару глав. Но сначала давайте поговорим о событиях в сравнении с командами events versus commands.


1. Моделирование на основе событий настолько популярно, что для облегчения сбора требований на основе событий и разработки модели предметной области была разработана практика под названием event storming.
2. Если вы немного читали об архитектуре, управляемой событиями, вы можете подумать: "Некоторые из этих событий больше похожи на команды!" Терпение граждане! Мы пытаемся ввести одну концепцию за раз. В следующая глава, мы введем различие между командами и событиями.
3. «Простая» реализация в этой главе по существу использует сам модуль messagebus.py для реализации шаблона Singleton.

Комментарии

Популярные сообщения из этого блога

4.Наш первый Use Case или пример использования: Flask API и Service Layer

Введение

2.Repository Pattern