Event-Driven Architecture: Использование событий для интеграции микросервисов
В предыдущей главе мы как то промолчали о том, как мы получим события «измененного количества партий», или, по сути, как мы можем уведомить внешний мир о перераспределении.
У нас есть микросервис с веб-API, но как насчет других способов общения с другими системами? Как мы узнаем, если, скажем, отгрузка задерживается или количество изменяется? Как мы сообщим складской системе, что заказ распределен и должен быть отправлен клиенту?
В этой главе мы хотели бы показать, как метафора events может быть расширена, чтобы охватить способ обработки входящих и исходящих сообщений из системы. Внутренне ядро нашего приложения теперь процессор сообщений. Давайте проследим за тем, чтобы он также стал обработчиком внешних сообщений. Как показано в Наше приложение является процессором сообщений, наше приложение будет получать события из внешних источников через внешнюю шину сообщений (в качестве примера мы будем использовать очереди Redis pub/sub) и публиковать свои выходные данные в виде событий там же.
Код для этой главы находится в ветви chapter_11_external_events on GitHub: git clone https://github.com/cosmicpython/code.git cd code git checkout chapter_11_external_events # или, чтобы кодить вместе, проверьте предыдущую главу: git checkout chapter_10_commands |
Distributed Ball of Mud, и мыслим в существительных
Прежде чем мы двинем дальше, давайте поговорим об альтернативах. Мы регулярно общаемся с инженерами, которые пытаются создать архитектуру микросервисов. Часто они мигрируют из существующего приложения, и их первый инстинкт состоит в том, чтобы разделить свою систему на существительные.
Какие существительные мы уже ввели в нашу систему? Ну, у нас есть партии(batches) товара(stock), заказов(orders), продуктов(products) и клиентов(customers). Таким образом, наивная попытка разрушить систему могла бы выглядеть следующим образом Context diagram with noun-based services (обратите внимание, что мы назвали нашу систему в честь существительного Партии(Batches), а не Распределение(Allocation)).
[plantuml, apwp_1102, config=plantuml.cfg] @startuml Batches Context Diagram !include images/C4_Context.puml System(batches, "Batches", "Knows about available stock""Знает об имеющихся запасах") Person(customer, "Customer", "Wants to buy furniture""Хочет купить мебель") System(orders, "Orders", "Knows about customer orders""Знает о заказах клиентов") System(warehouse, "Warehouse", "Knows about shipping instructions""Знает об инструкциях по доставке") Rel_R(customer, orders, "Places order with""Размещает заказ с") Rel_D(orders, batches, "Reserves stock with""Резервирует запасы с") Rel_D(batches, warehouse, "Sends instructions to""Посылает инструкции") @enduml
Каждая "Штуковина" в нашей системе имеет связанную службу, которая предоставляет HTTP API.
Давайте рассмотрим пример happy-path в Поток Команд 1.: наши пользователи посещают веб - сайт и могут выбирать из продуктов, которые есть на складе. Когда они добавят товар в свою корзину, мы зарезервируем для них некоторый запас. Когда заказ завершен, мы подтверждаем бронирование, что заставляет нас отправлять инструкции по отправке на склад. Допустим, к примеру, если это третий заказ клиента, то надо обновить запись клиента, чтобы отметить его как VIP-персону.
[plantuml, apwp_1103, config=plantuml.cfg] @startuml scale 4 actor Customer entity Orders entity Batches entity Warehouse database CRM == Reservation(Бронирование) == Customer -> Orders: Add product to basket (Добавляет товар в корзину) Orders -> Batches: Reserve stock (Резервирует запас) == Purchase(Закупка) == Customer -> Orders: Place order(Размещает заказ) activate Orders Orders -> Batches: Confirm reservation(Подтверждает бронирование) Batches -> Warehouse: Dispatch goods(Отправляет товары) Orders -> CRM: Update customer record(Обновляет запись клиента) deactivate Orders @enduml
Мы можем рассматривать каждый из этих шагов как команду в нашей системе: ReserveStock, ConfirmReservation, DispatchGoods, MakeCustomerVIP, и так далее.
Этот стиль архитектуры, в котором мы создаем микросервисы для каждой таблицы базы данных и рассматриваем наши HTTP-API как интерфейсы CRUD для анемичных моделей, является наиболее распространенным первоначальным способом для людей подойти к сервис-ориентированному дизайну.
Это прекрасно работает для очень простых систем, но может быстро превратиться в distributed ball of mud(расползающийся ком грязи).
Чтобы понять почему, давайте рассмотрим другой случай. Иногда, когда товар поступает на склад, мы обнаруживаем, что товары были повреждены водой во время транспортировки. Мы не можем продавать поврежденные водой диваны, поэтому нам приходится выбрасывать их и запрашивать больше запасов у наших партнеров. Нам также необходимо обновить нашу модель запасов, и это может означать, что нам нужно перераспределить заказ клиента.
Куда ведет эта логика?
Ну, система складского учёта знает, что запас был поврежден, поэтому, возможно, она должна владеть этим процессом, как показано на рисунке. Поток команд 2.
[plantuml, apwp_1104, config=plantuml.cfg] @startuml scale 4 actor w as "Warehouse worker" entity Warehouse entity Batches entity Orders database CRM w -> Warehouse: Report stock damage activate Warehouse Warehouse -> Batches: Decrease available stock Batches -> Batches: Reallocate orders Batches -> Orders: Update order status Orders -> CRM: Update order history deactivate Warehouse @enduml
Это тоже работает, но теперь наш график зависимостей в беспорядке. Для распределения запасов служба заказов(Orders service) управляет системой Партий(Batches system), которая управляет Складом(Warehouse); но для решения проблем на складе наша складская система(Warehouse system) управляет партиями(Batches), которые управляют Заказами(Orders).
Умножьте это на все другие рабочие процессы, которые нам нужно обеспечить, и вы увидите, как быстро запутываются службы.
Обработка ошибок в распределенных системах
"Вещи ломаются" - это универсальный закон разработки программного обеспечения. Что происходит в нашей системе, когда один из наших запросов терпит неудачу? Предположим, что сетевая ошибка происходит сразу после того, как мы принимаем заказ пользователя на три MISBEGOTTEN-RUG, как показано на рисунке Поток команд с ошибкой.
У нас есть два варианта: мы можем разместить заказ в любом случае и оставить его нераспределенным(unallocated), или мы можем отказаться принять заказ, потому что распределение не может быть гарантировано. Резко всплывший сбой в работе нашей службы обработки партий оказывает критическое влияние на надежность нашей службы заказов.
Когда две хреновины должны быть изменены вместе, мы говорим, что они связанны coupled. Мы можем рассматривать этот каскад сбоев как своего рода временную связанность temporal coupling: все части системы должны работать одновременно, чтобы обеспечить работоспособность каждой в отдельности. По мере того как система становится больше, вероятность того, что какая-то часть деградирует, экспоненциально возрастает.
[plantuml, apwp_1105, config=plantuml.cfg] @startuml scale 4 actor Customer entity Orders entity Batches Customer -> Orders: Place order(Разместить заказ) Orders -[#red]x Batches: Confirm reservation(Подтвердить бронирование) hnote right: network error(ошибка сети) Orders --> Customer: ??? @enduml
Альтернатива: Временное Разъединение(Декаплинг) С Использованием Асинхронного Обмена Сообщениями
Как нам получить соответствующую связь? Мы уже видели часть ответа, которая заключается в том, что мы должны думать в терминах глаголов, а не существительных. Наша модель предметной области посвящена моделированию бизнес-процесса. Это не статическая модель данных о чём то таком; это модель глагола.
Поэтому вместо того, чтобы думать о системе для заказов и системе для партий, мы думаем о системе для ordering и системе для allocating и так далее.
Когда мы разделим вещи таким образом, станет немного легче понять, какая система за что должна отвечать. Когда мы думаем о ordering, на самом деле мы хотим убедиться, что, когда мы размещаем заказ, заказ размещен. Все остальное может случиться позже, как только удачно случится это.
| Если это звучит знакомо, то так и должно быть! Разделение ответственности - это тот же процесс, через который мы прошли при разработке наших агрегатов и команд. |
Как и агрегаты, микросервисы должны быть консистентными границами. Между двумя службами мы можем принять возможную согласованность, и это означает, что нам не нужно полагаться на синхронные вызовы. Каждая служба принимает команды из внешнего мира и создает события для записи результата. Другие службы могут прослушивать(listen) эти события, чтобы инициировать следующие шаги в рабочем процессе.
Чтобы избежать неприятностей в виде Distributed Ball of Mud, вместо временно связанных вызовов HTTP API мы будем использовать асинхронный обмен сообщениями для интеграции наших систем. Нам надо, чтобы наши сообщения BatchQuantityChanged поступали как внешние сообщения из вышестоящих систем, и мы хотим, чтобы наша система публиковала события Allocated для прослушивания нижестоящими системами.
Почему так лучше? Во-первых, поскольку все вещи могут выходить из строя независимо друг от друга, то легче справиться с ухудшением поведения: мы все еще можем принимать заказы, если у системы распределения плохой день.
Во-вторых, мы уменьшаем силу связи между нашими системами. Если нам нужно изменить порядок операций или ввести новые этапы в процесс, мы можем сделать это на месте.
Использование канала Redis Pub/Sub для интеграции
Давайте посмотрим конкретно, как все это будет работать. Нам понадобится какой-то способ получения событий из одной системы в другую, подобно нашей шине сообщений, но для сервисов. Этот элемент инфраструктуры часто называют message broker. Роль брокера сообщений заключается в том, чтобы принимать сообщения от издателей и доставлять их подписчикам.
На сайте MADE.com мы используем Event Store; Kafka или RabbitMQ являются достойными альтернативами. Легкое решение на основе Redis pub/sub channels также может прекрасно работать, и поскольку Redis гораздо более привычен для большинства программистов, мы решили использовать его в этой книге.
| Мы упускаем из виду сложность, связанную с выбором правильной платформы для обмена сообщениями. Необходимо продумать такие вопросы, как упорядочение сообщений, обработка отказов и идемпотентность. Несколько советов см. [footguns]. |
Наш новый поток будет выглядеть следующим образом Диаграмма последовательности для потока перераспределения: Redis предоставляет событие BatchQuantityChanged, которое запускает весь процесс, а наше событие Allocated снова публикуется в Redis в конце.
[plantuml, apwp_1106, config=plantuml.cfg]
@startuml
scale 4
Redis -> MessageBus : BatchQuantityChanged event
group BatchQuantityChanged Handler + Unit of Work 1
MessageBus -> Domain_Model : change batch quantity
Domain_Model -> MessageBus : emit Allocate command(s)
end
group Allocate Handler + Unit of Work 2 (or more)
MessageBus -> Domain_Model : allocate
Domain_Model -> MessageBus : emit Allocated event(s)
end
MessageBus -> Redis : publish to line_allocated channel
@endumlТест-Драйв всего этого с использованием Сквозного теста
Вот как мы можем начать со сквозного тестирования. Мы можем использовать наш существующий API для создания пакетов, а затем протестируем входящие и исходящие сообщения:
def test_change_batch_quantity_leading_to_reallocation():
# начать с двух партий и заказа, выделенного для одной из них
orderid, sku = random_orderid(), random_sku()
earlier_batch, later_batch = random_batchref('old'), random_batchref('newer')
api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-01')
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02')
response = api_client.post_to_allocate(orderid, sku, 10)
assert response.json()['batchref'] == earlier_batch
subscription = redis_client.subscribe_to('line_allocated')
# изменить количество выделенной партии так, чтобы оно было меньше нашего заказа
redis_client.publish_message('change_batch_quantity', {
'batchref': earlier_batch, 'qty': 5
})
# подождать, пока не появится сообщение о том, что заказ был перераспределен
messages = []
for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
with attempt:
message = subscription.get_message(timeout=1)
if message:
messages.append(message)
print(messages)
data = json.loads(messages[-1]['data'])
assert data['orderid'] == orderid
assert data['batchref'] == later_batch| То, что происходит в этом месте, понятно из комментариев: мы хотим отправить в систему событие, которое вызывает перераспределение строки заказа, и мы видим, что это перераспределение также появляется как событие в Redis. | |
api_client - это маленький помощник, который мы отрефакторили для совместного использования двумя типами тестов; он оборачивает наши вызовы к requests.post. | |
redis_client - это еще один маленький помощник в тестировании, детали которого не имеют особого значения; его задача заключается в том, чтобы иметь возможность отправлять и получать сообщения из различных каналов Redis. Мы будем использовать канал под названием change_batch_quantity для отправки запроса на изменение количества для партии, и мы будем слушать другой канал под названием line_allocated для поиска ожидаемого перераспределения. | |
Из-за асинхронной природы тестируемой системы нам нужно снова использовать библиотеку tenacity, чтобы добавить цикл повтора - во-первых, потому что может пройти некоторое время, пока наше новое сообщение line_allocated придет, а также потому, что это будет не единственное сообщение на этом канале. |
Redis - еще один тонкий адаптер вокруг нашей шины сообщений
Наш слушатель(listener) Redis pub/sub (мы называем его потребитель событий) очень похож на Flask: он транслируется из внешнего мира в наши события:
r = redis.Redis(**config.get_redis_host_and_port())
def main():
orm.start_mappers()
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe('change_batch_quantity')
for m in pubsub.listen():
handle_change_batch_quantity(m)
def handle_change_batch_quantity(m):
logging.debug('handling %s', m)
data = json.loads(m['data'])
cmd = commands.ChangeBatchQuantity(ref=data['batchref'], qty=data['qty'])
messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())main() подписывает нас на канал change_batch_quantity при загрузке. | |
Наша основная задача как точки входа в систему - десериализовать JSON, преобразовать его в Command и передать его сервисному уровню - так же, как это делает адаптер Flask. |
Мы также создаем новый адаптер для выполнения противоположной задачи - преобразования событий домена в публичные события:
r = redis.Redis(**config.get_redis_host_and_port())
def publish(channel, event: events.Event):
logging.debug('publishing: channel=%s, event=%s', channel, event)
r.publish(channel, json.dumps(asdict(event)))| Здесь мы используем жестко заданный канал, но вы также можете хранить связку между classes/names событий и соответствующим каналом, позволяя одному или нескольким типам сообщений отправляться на разные каналы. |
Наше новое выездное мероприятие
Вот как будет выглядеть событие Allocated:
@dataclass
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: strВ нем содержится все, что нам нужно знать о распределении: сведения о строке заказа и о том, для какой партии он был выделен.
Мы добавляем его в метод allocate() нашей модели (предварительно добавив тест, естественно):
class Product:
...
def allocate(self, line: OrderLine) -> str:
...
batch.allocate(line)
self.version_number += 1
self.events.append(events.Allocated(
orderid=line.orderid, sku=line.sku, qty=line.qty,
batchref=batch.reference,
))
return batch.referenceОбработчик для ChangeBatchQuantity уже существует, поэтому все, что нам нужно добавить, это обработчик, который публикует исходящее событие:
HANDLERS = {
events.Allocated: [handlers.publish_allocated_event],
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]Для публикации события используется наша вспомогательная функция из обертки Redis:
def publish_allocated_event(
event: events.Allocated, uow: unit_of_work.AbstractUnitOfWork,
):
redis_eventpublisher.publish('line_allocated', event)Внутренние и внешние события
Хорошо, если различие между внутренними и внешними событиями будет четким. Некоторые события могут приходить извне, а некоторые события могут обновляться и публиковаться извне, но не все будут таковыми. Это особенно важно, если вы попадаете в event sourcing (хотя это очень подходящая тема для другой книги).
| Исходящие события — это одно из мест, где важно применять валидацию. См. [appendix_validation] для ознакомления с некоторой философией валидации и примеры. |
Подведение итогов
События могут приходить _из_вне, но они также могут быть опубликованы извне - наш обработчик publish преобразует событие в сообщение на канале Redis. Мы используем события для общения с внешним миром. Такая временная развязка обеспечивает нам большую гибкость в интеграции приложений, но, как всегда, за это приходится платить.
Уведомление о событиях хорошо тем, что оно подразумевает низкий уровень связи и довольно просто настраивается. Однако это может стать проблематичным, если действительно существует логический поток, который проходит через различные уведомления о событиях... Такой поток может быть трудно увидеть, поскольку он не выражен явно в тексте программы.... Это может затруднить отладку и модификацию.
Martin Fowler, "What do you mean by 'Event-Driven'"
Интеграция микросервисов на основе событий: компромиссы показывает некоторые компромиссы, о которых стоит подумать.
| Плюсы | Минусы |
|---|---|
|
|
В более общем случае, если вы переходите от модели синхронного обмена сообщениями к асинхронному, вы также открываете целый ряд проблем, связанных с надежностью и эвентуальной консистентностью сообщений. Читать далее [footguns].









Комментарии
Отправить комментарий