Event-Driven Architecture: Использование событий для интеграции микросервисов

 В предыдущей главе мы как то промолчали о том, как мы получим события «измененного количества партий», или, по сути, как мы можем уведомить внешний мир о перераспределении.

У нас есть микросервис с веб-API, но как насчет других способов общения с другими системами? Как мы узнаем, если, скажем, отгрузка задерживается или количество изменяется? Как мы сообщим складской системе, что заказ распределен и должен быть отправлен клиенту?

В этой главе мы хотели бы показать, как метафора events может быть расширена, чтобы охватить способ обработки входящих и исходящих сообщений из системы. Внутренне ядро нашего приложения теперь процессор сообщений. Давайте проследим за тем, чтобы он также стал обработчиком внешних сообщений. Как показано в Наше приложение является процессором сообщений, наше приложение будет получать события из внешних источников через внешнюю шину сообщений (в качестве примера мы будем использовать очереди Redis pub/sub) и публиковать свои выходные данные в виде событий там же.




Figure 1. Наше приложение является процессором сообщений

Код для этой главы находится в ветви chapter_11_external_events on GitHub:

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_11_external_events
# или, чтобы кодить вместе, проверьте предыдущую главу:
git checkout chapter_10_commands

Distributed Ball of Mud, и мыслим в существительных

Прежде чем мы двинем дальше, давайте поговорим об альтернативах. Мы регулярно общаемся с инженерами, которые пытаются создать архитектуру микросервисов. Часто они мигрируют из существующего приложения, и их первый инстинкт состоит в том, чтобы разделить свою систему на существительные.

Какие существительные мы уже ввели в нашу систему? Ну, у нас есть партии(batches) товара(stock), заказов(orders), продуктов(products) и клиентов(customers). Таким образом, наивная попытка разрушить систему могла бы выглядеть следующим образом Context diagram with noun-based services (обратите внимание, что мы назвали нашу систему в честь существительного Партии(Batches), а не Распределение(Allocation)).


Figure 2. Context diagram with noun-based services
[plantuml, apwp_1102, config=plantuml.cfg]
@startuml Batches Context Diagram
!include images/C4_Context.puml

System(batches, "Batches", "Knows about available stock""Знает об имеющихся запасах")
Person(customer, "Customer", "Wants to buy furniture""Хочет купить мебель")
System(orders, "Orders", "Knows about customer orders""Знает о заказах клиентов")
System(warehouse, "Warehouse", "Knows about shipping instructions""Знает об инструкциях по доставке")

Rel_R(customer, orders, "Places order with""Размещает заказ с")
Rel_D(orders, batches, "Reserves stock with""Резервирует запасы с")
Rel_D(batches, warehouse, "Sends instructions to""Посылает инструкции")

@enduml

Каждая "Штуковина" в нашей системе имеет связанную службу, которая предоставляет HTTP API.

Давайте рассмотрим пример happy-path в Поток Команд 1.: наши пользователи посещают веб - сайт и могут выбирать из продуктов, которые есть на складе. Когда они добавят товар в свою корзину, мы зарезервируем для них некоторый запас. Когда заказ завершен, мы подтверждаем бронирование, что заставляет нас отправлять инструкции по отправке на склад. Допустим, к примеру, если это третий заказ клиента, то надо обновить запись клиента, чтобы отметить его как VIP-персону.





Figure 3. Поток Команд 1.
[plantuml, apwp_1103, config=plantuml.cfg]
@startuml
scale 4

actor Customer
entity Orders
entity Batches
entity Warehouse
database CRM


== Reservation(Бронирование) ==

  Customer -> Orders: Add product to basket (Добавляет товар в корзину)
  Orders -> Batches: Reserve stock (Резервирует запас)

== Purchase(Закупка) ==

  Customer -> Orders: Place order(Размещает заказ)
  activate Orders
  Orders -> Batches: Confirm reservation(Подтверждает бронирование)
  Batches -> Warehouse: Dispatch goods(Отправляет товары)
  Orders -> CRM: Update customer record(Обновляет запись клиента)
  deactivate Orders


@enduml

Мы можем рассматривать каждый из этих шагов как команду в нашей системе: ReserveStockConfirmReservationDispatchGoodsMakeCustomerVIP, и так далее.

Этот стиль архитектуры, в котором мы создаем микросервисы для каждой таблицы базы данных и рассматриваем наши HTTP-API как интерфейсы CRUD для анемичных моделей, является наиболее распространенным первоначальным способом для людей подойти к сервис-ориентированному дизайну.

Это прекрасно работает для очень простых систем, но может быстро превратиться в distributed ball of mud(расползающийся ком грязи).

Чтобы понять почему, давайте рассмотрим другой случай. Иногда, когда товар поступает на склад, мы обнаруживаем, что товары были повреждены водой во время транспортировки. Мы не можем продавать поврежденные водой диваны, поэтому нам приходится выбрасывать их и запрашивать больше запасов у наших партнеров. Нам также необходимо обновить нашу модель запасов, и это может означать, что нам нужно перераспределить заказ клиента.

Куда ведет эта логика?

Ну, система складского учёта знает, что запас был поврежден, поэтому, возможно, она должна владеть этим процессом, как показано на рисунке. Поток команд 2.



Figure 4. Поток команд 2
[plantuml, apwp_1104, config=plantuml.cfg]
@startuml
scale 4

actor w as "Warehouse worker"
entity Warehouse
entity Batches
entity Orders
database CRM


  w -> Warehouse: Report stock damage
  activate Warehouse
  Warehouse -> Batches: Decrease available stock
  Batches -> Batches: Reallocate orders
  Batches -> Orders: Update order status
  Orders -> CRM: Update order history
  deactivate Warehouse

@enduml

Это тоже работает, но теперь наш график зависимостей в беспорядке. Для распределения запасов служба заказов(Orders service) управляет системой Партий(Batches system), которая управляет Складом(Warehouse); но для решения проблем на складе наша складская система(Warehouse system) управляет партиями(Batches), которые управляют Заказами(Orders).

Умножьте это на все другие рабочие процессы, которые нам нужно обеспечить, и вы увидите, как быстро запутываются службы.

Обработка ошибок в распределенных системах

"Вещи ломаются" - это универсальный закон разработки программного обеспечения. Что происходит в нашей системе, когда один из наших запросов терпит неудачу? Предположим, что сетевая ошибка происходит сразу после того, как мы принимаем заказ пользователя на три MISBEGOTTEN-RUG, как показано на рисунке Поток команд с ошибкой.

У нас есть два варианта: мы можем разместить заказ в любом случае и оставить его нераспределенным(unallocated), или мы можем отказаться принять заказ, потому что распределение не может быть гарантировано. Резко всплывший сбой в работе нашей службы обработки партий оказывает критическое влияние на надежность нашей службы заказов.

Когда две хреновины должны быть изменены вместе, мы говорим, что они связанны coupled. Мы можем рассматривать этот каскад сбоев как своего рода временную связанность temporal coupling: все части системы должны работать одновременно, чтобы обеспечить работоспособность каждой в отдельности. По мере того как система становится больше, вероятность того, что какая-то часть деградирует, экспоненциально возрастает.



Figure 5. Поток команд с ошибкой
[plantuml, apwp_1105, config=plantuml.cfg]
@startuml
scale 4

actor Customer
entity Orders
entity Batches

Customer -> Orders: Place order(Разместить заказ)
Orders -[#red]x Batches: Confirm reservation(Подтвердить бронирование)
hnote right: network error(ошибка сети)
Orders --> Customer: ???

@enduml
Connascence(Связанность)

Мы используем здесь термин связь(coupling), но есть и другой способ описать отношения между нашими системами. Connascence(Связанность) это термин, используемый некоторыми авторами для описания различных типов связи.

Связаноность — не плохо, но некоторые виды связанности влияют сильнее, чем другие. Мы хотим иметь локальную сильную связь, как когда два класса тесно связаны, но слабую связь на расстоянии.

В нашем первом примере расползающегося шара грязи мы видим Связанность по Выполнению(Connascence of Execution): несколько компонентов должны знать о правильном порядке работы, чтобы операция была успешной.

Когда мы думаем об условиях ошибок здесь, мы говорим о Связанность по Времени(Connascence of Timing): для того, чтобы операция сработала, должно произойти несколько событий, одно за другим.

Когда мы заменяем нашу систему в RPC-стиле событиями, мы заменяем оба этих типа связи более слабым типом. Это Связанность по Имени(Connascence of Name): для нескольких компонентов необходимо согласовать только название события и название поля, которое оно содержит.

Мы никогда не сможем полностью избежать связей, только если запретим нашему программному обеспечению общаться с каким - либо другим программным обеспечением. Чего мы хотим, так это избежать ненужной(inappropriate) связи. Connascence обеспечивает ментальную модель для понимания силы и типа связи, присущих различным архитектурным стилям. Читайте все об этом по адресу connascence.io.

Альтернатива: Временное Разъединение(Декаплинг) С Использованием Асинхронного Обмена Сообщениями

Как нам получить соответствующую связь? Мы уже видели часть ответа, которая заключается в том, что мы должны думать в терминах глаголов, а не существительных. Наша модель предметной области посвящена моделированию бизнес-процесса. Это не статическая модель данных о чём то таком; это модель глагола.

Поэтому вместо того, чтобы думать о системе для заказов и системе для партий, мы думаем о системе для ordering и системе для allocating и так далее.

Когда мы разделим вещи таким образом, станет немного легче понять, какая система за что должна отвечать. Когда мы думаем о ordering, на самом деле мы хотим убедиться, что, когда мы размещаем заказ, заказ размещен. Все остальное может случиться позже, как только удачно случится это.


Если это звучит знакомо, то так и должно быть! Разделение ответственности - это тот же процесс, через который мы прошли при разработке наших агрегатов и команд.

Как и агрегаты, микросервисы должны быть консистентными границами. Между двумя службами мы можем принять возможную согласованность, и это означает, что нам не нужно полагаться на синхронные вызовы. Каждая служба принимает команды из внешнего мира и создает события для записи результата. Другие службы могут прослушивать(listen) эти события, чтобы инициировать следующие шаги в рабочем процессе.

Чтобы избежать неприятностей в виде Distributed Ball of Mud, вместо временно связанных вызовов HTTP API мы будем использовать асинхронный обмен сообщениями для интеграции наших систем. Нам надо, чтобы наши сообщения BatchQuantityChanged поступали как внешние сообщения из вышестоящих систем, и мы хотим, чтобы наша система публиковала события Allocated для прослушивания нижестоящими системами.

Почему так лучше? Во-первых, поскольку все вещи могут выходить из строя независимо друг от друга, то легче справиться с ухудшением поведения: мы все еще можем принимать заказы, если у системы распределения плохой день.

Во-вторых, мы уменьшаем силу связи между нашими системами. Если нам нужно изменить порядок операций или ввести новые этапы в процесс, мы можем сделать это на месте.

Использование канала Redis Pub/Sub для интеграции

Давайте посмотрим конкретно, как все это будет работать. Нам понадобится какой-то способ получения событий из одной системы в другую, подобно нашей шине сообщений, но для сервисов. Этот элемент инфраструктуры часто называют message broker. Роль брокера сообщений заключается в том, чтобы принимать сообщения от издателей и доставлять их подписчикам.

На сайте MADE.com мы используем Event Store; Kafka или RabbitMQ являются достойными альтернативами. Легкое решение на основе Redis pub/sub channels также может прекрасно работать, и поскольку Redis гораздо более привычен для большинства программистов, мы решили использовать его в этой книге.


Мы упускаем из виду сложность, связанную с выбором правильной платформы для обмена сообщениями. Необходимо продумать такие вопросы, как упорядочение сообщений, обработка отказов и идемпотентность. Несколько советов см. [footguns].

Наш новый поток будет выглядеть следующим образом Диаграмма последовательности для потока перераспределения: Redis предоставляет событие BatchQuantityChanged, которое запускает весь процесс, а наше событие Allocated снова публикуется в Redis в конце.



Figure 6. Диаграмма последовательности для потока перераспределения
[plantuml, apwp_1106, config=plantuml.cfg]
@startuml
scale 4

Redis -> MessageBus : BatchQuantityChanged event

group BatchQuantityChanged Handler + Unit of Work 1
    MessageBus -> Domain_Model : change batch quantity
    Domain_Model -> MessageBus : emit Allocate command(s)
end


group Allocate Handler + Unit of Work 2 (or more)
    MessageBus -> Domain_Model : allocate
    Domain_Model -> MessageBus : emit Allocated event(s)
end

MessageBus -> Redis : publish to line_allocated channel
@enduml

Тест-Драйв всего этого с использованием Сквозного теста

Вот как мы можем начать со сквозного тестирования. Мы можем использовать наш существующий API для создания пакетов, а затем протестируем входящие и исходящие сообщения:

Example 1. Сквозной тест для нашей модели pub/sub (tests/e2e/test_external_events.py)
def test_change_batch_quantity_leading_to_reallocation():
    # начать с двух партий и заказа, выделенного для одной из них  
orderid, sku = random_orderid(), random_sku() earlier_batch, later_batch = random_batchref('old'), random_batchref('newer') api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-01') api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02') response = api_client.post_to_allocate(orderid, sku, 10) assert response.json()['batchref'] == earlier_batch subscription = redis_client.subscribe_to('line_allocated') # изменить количество выделенной партии так, чтобы оно было меньше нашего заказа redis_client.publish_message('change_batch_quantity', { 'batchref': earlier_batch, 'qty': 5 }) # подождать, пока не появится сообщение о том, что заказ был перераспределен messages = [] for attempt in Retrying(stop=stop_after_delay(3), reraise=True): with attempt: message = subscription.get_message(timeout=1) if message: messages.append(message) print(messages) data = json.loads(messages[-1]['data']) assert data['orderid'] == orderid assert data['batchref'] == later_batch
То, что происходит в этом месте, понятно из комментариев: мы хотим отправить в систему событие, которое вызывает перераспределение строки заказа, и мы видим, что это перераспределение также появляется как событие в Redis.
api_client - это маленький помощник, который мы отрефакторили для совместного использования двумя типами тестов; он оборачивает наши вызовы к requests.post.
redis_client - это еще один маленький помощник в тестировании, детали которого не имеют особого значения; его задача заключается в том, чтобы иметь возможность отправлять и получать сообщения из различных каналов Redis. Мы будем использовать канал под названием change_batch_quantity для отправки запроса на изменение количества для партии, и мы будем слушать другой канал под названием line_allocated для поиска ожидаемого перераспределения.
Из-за асинхронной природы тестируемой системы нам нужно снова использовать библиотеку tenacity, чтобы добавить цикл повтора - во-первых, потому что может пройти некоторое время, пока наше новое сообщение line_allocated придет, а также потому, что это будет не единственное сообщение на этом канале.

Redis - еще один тонкий адаптер вокруг нашей шины сообщений

Наш слушатель(listener) Redis pub/sub (мы называем его потребитель событий) очень похож на Flask: он транслируется из внешнего мира в наши события:

Example 2. Простой слушатель(listener) сообщений Redis (src/allocation/entrypoints/redis_eventconsumer.py)
r = redis.Redis(**config.get_redis_host_and_port())


def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe('change_batch_quantity')  

    for m in pubsub.listen():
        handle_change_batch_quantity(m)


def handle_change_batch_quantity(m):
    logging.debug('handling %s', m)
    data = json.loads(m['data'])  
    cmd = commands.ChangeBatchQuantity(ref=data['batchref'], qty=data['qty'])  
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
main() подписывает нас на канал change_batch_quantity при загрузке.
Наша основная задача как точки входа в систему - десериализовать JSON, преобразовать его в Command и передать его сервисному уровню - так же, как это делает адаптер Flask.

Мы также создаем новый адаптер для выполнения противоположной задачи - преобразования событий домена в публичные события:

Example 3. Простой издатель сообщений Redis (src/allocation/adapters/redis_eventpublisher.py)
r = redis.Redis(**config.get_redis_host_and_port())


def publish(channel, event: events.Event):  
    logging.debug('publishing: channel=%s, event=%s', channel, event)
    r.publish(channel, json.dumps(asdict(event)))
Здесь мы используем жестко заданный канал, но вы также можете хранить связку между classes/names событий и соответствующим каналом, позволяя одному или нескольким типам сообщений отправляться на разные каналы.

Наше новое выездное мероприятие

Вот как будет выглядеть событие Allocated:

Example 4. Новое событие (src/allocation/domain/events.py)
@dataclass
class Allocated(Event):
    orderid: str
    sku: str
    qty: int
    batchref: str

В нем содержится все, что нам нужно знать о распределении: сведения о строке заказа и о том, для какой партии он был выделен.

Мы добавляем его в метод allocate() нашей модели (предварительно добавив тест, естественно):

Example 5. Product.allocate() выдает новое событие для записи того, что произошло (src/allocation/domain/model.py)
class Product:
    ...
    def allocate(self, line: OrderLine) -> str:
        ...

            batch.allocate(line)
            self.version_number += 1
            self.events.append(events.Allocated(
                orderid=line.orderid, sku=line.sku, qty=line.qty,
                batchref=batch.reference,
            ))
            return batch.reference

Обработчик для ChangeBatchQuantity уже существует, поэтому все, что нам нужно добавить, это обработчик, который публикует исходящее событие:

Example 6. Шина сообщений разрастается (src/allocation/service_layer/messagebus.py)
HANDLERS = {
    events.Allocated: [handlers.publish_allocated_event],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

Для публикации события используется наша вспомогательная функция из обертки Redis:

Example 7. Публикация в Redis (src/allocation/service_layer/handlers.py)
def publish_allocated_event(
        event: events.Allocated, uow: unit_of_work.AbstractUnitOfWork,
):
    redis_eventpublisher.publish('line_allocated', event)

Внутренние и внешние события

Хорошо, если различие между внутренними и внешними событиями будет четким. Некоторые события могут приходить извне, а некоторые события могут обновляться и публиковаться извне, но не все будут таковыми. Это особенно важно, если вы попадаете в event sourcing (хотя это очень подходящая тема для другой книги).

Исходящие события — это одно из мест, где важно применять валидацию. См. [appendix_validation] для ознакомления с некоторой философией валидации и примеры.
Упражнение для читателя

Ниболее удачный простой вариант для этой главы: сделать так, чтобы основной сценарий использования allocate() мог быть вызван событием на канале Redis, а также (или вместо) через API.

Скорее всего, вы захотите добавить новый тест E2E и передать некоторые изменения в redis_eventconsumer.py.

Подведение итогов

События могут приходить _из_вне, но они также могут быть опубликованы извне - наш обработчик publish преобразует событие в сообщение на канале Redis. Мы используем события для общения с внешним миром. Такая временная развязка обеспечивает нам большую гибкость в интеграции приложений, но, как всегда, за это приходится платить.

Уведомление о событиях хорошо тем, что оно подразумевает низкий уровень связи и довольно просто настраивается. Однако это может стать проблематичным, если действительно существует логический поток, который проходит через различные уведомления о событиях... Такой поток может быть трудно увидеть, поскольку он не выражен явно в тексте программы.... Это может затруднить отладку и модификацию.

Martin Fowler, "What do you mean by 'Event-Driven'"

Интеграция микросервисов на основе событий: компромиссы показывает некоторые компромиссы, о которых стоит подумать.

Table 1. Интеграция микросервисов на основе событий: компромиссы
ПлюсыМинусы
  • Избегает distributed big ball of mud.

  • Сервисы разделены: проще менять отдельные сервисы и добавлять новые.

  • Общие потоки информации сложнее увидеть.

  • Эвентуальная консистентность - это новая концепция, с которой нужно иметь дело.

  • Message reliability and choices around at-least-once versus at-most-once delivery need thinking through. Надежность сообщений и выбор между доставкой at-least-once versus"минимум один раз" и at-most-once"максимум один раз" требуют продумывания.

В более общем случае, если вы переходите от модели синхронного обмена сообщениями к асинхронному, вы также открываете целый ряд проблем, связанных с надежностью и эвентуальной консистентностью сообщений. Читать далее [footguns].

Комментарии

Популярные сообщения из этого блога

4.Наш первый Use Case или пример использования: Flask API и Service Layer

Введение

2.Repository Pattern